You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/03/23 16:52:15 UTC

hadoop git commit: YARN-4814. ATS 1.5 timelineclient impl call flush after every event write. Contributed by Xuan Gong. (cherry picked from commit af1d125f9ce35ec69a610674a1c5c60cc17141a7) (cherry picked from commit 76602161c006ddc9cf2404ac762f127f107d14

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 f362b8a40 -> 762c7d436


YARN-4814. ATS 1.5 timelineclient impl call flush after every event write. Contributed by Xuan Gong.
(cherry picked from commit af1d125f9ce35ec69a610674a1c5c60cc17141a7)
(cherry picked from commit 76602161c006ddc9cf2404ac762f127f107d14cd)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/762c7d43
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/762c7d43
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/762c7d43

Branch: refs/heads/branch-2.8
Commit: 762c7d4361d8acf34aff46b2a01f38efb989c777
Parents: f362b8a
Author: Junping Du <ju...@apache.org>
Authored: Wed Mar 23 08:57:16 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Wed Mar 23 09:02:56 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   6 ++
 .../api/impl/FileSystemTimelineWriter.java      | 104 ++++++++++++++-----
 2 files changed, 86 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/762c7d43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5e1c6fa..8018d1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1679,6 +1679,12 @@ public class YarnConfiguration extends Configuration {
   public static final long
       TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60;
 
+  public static final String
+      TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE =
+      TIMELINE_SERVICE_CLIENT_PREFIX + "internal-attempt-dir-cache-size";
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE = 1000;
+
   // This is temporary solution. The configuration will be deleted once we have
   // the FileSystem API to check whether append operation is supported or not.
   public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND

http://git-wip-us.apache.org/repos/asf/hadoop/blob/762c7d43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index b471b3b..35d9970 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -97,6 +98,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
   private long ttl;
   private LogFDsCache logFDsCache = null;
   private boolean isAppendSupported;
+  private final AttemptDirCache attemptDirCache;
 
   public FileSystemTimelineWriter(Configuration conf,
       UserGroupInformation authUgi, Client client, URI resURI)
@@ -158,6 +160,15 @@ public class FileSystemTimelineWriter extends TimelineWriter{
 
     objMapper = createObjectMapper();
 
+    int attemptDirCacheSize = conf.getInt(
+        YarnConfiguration
+            .TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE,
+        YarnConfiguration
+            .DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE);
+
+    attemptDirCache =
+        new AttemptDirCache(attemptDirCacheSize, fs, activePath);
+
     if (LOG.isDebugEnabled()) {
       StringBuilder debugMSG = new StringBuilder();
       debugMSG.append(
@@ -199,7 +210,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
         = new ArrayList<TimelineEntity>();
     List<TimelineEntity> entitiesToEntityCache
         = new ArrayList<TimelineEntity>();
-    Path attemptDir = createAttemptDir(appAttemptId);
+    Path attemptDir = attemptDirCache.getAppAttemptDir(appAttemptId);
 
     for (TimelineEntity entity : entities) {
       if (summaryEntityTypes.contains(entity.getEntityType())) {
@@ -279,32 +290,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
     return mapper;
   }
 
-  private Path createAttemptDir(ApplicationAttemptId appAttemptId)
-      throws IOException {
-    Path appDir = createApplicationDir(appAttemptId.getApplicationId());
-
-    Path attemptDir = new Path(appDir, appAttemptId.toString());
-    if (!fs.exists(attemptDir)) {
-      FileSystem.mkdirs(fs, attemptDir, new FsPermission(
-          APP_LOG_DIR_PERMISSIONS));
-    }
-    return attemptDir;
-  }
-
-  private Path createApplicationDir(ApplicationId appId) throws IOException {
-    Path appDir =
-        new Path(activePath, appId.toString());
-    if (!fs.exists(appDir)) {
-      FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
-    }
-    return appDir;
-  }
-
   private void writeDomain(ApplicationAttemptId appAttemptId,
       TimelineDomain domain) throws IOException {
     Path domainLogPath =
-        new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
-            + appAttemptId.toString());
+        new Path(attemptDirCache.getAppAttemptDir(appAttemptId),
+            DOMAIN_LOG_PREFIX + appAttemptId.toString());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Writing domains for " + appAttemptId.toString() + " to "
           + domainLogPath);
@@ -958,4 +948,70 @@ public class FileSystemTimelineWriter extends TimelineWriter{
       }
     }
   }
+
+  private static class AttemptDirCache {
+    private final int attemptDirCacheSize;
+    private final Map<ApplicationAttemptId, Path> attemptDirCache;
+    private final FileSystem fs;
+    private final Path activePath;
+
+    public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) {
+      this.attemptDirCacheSize = cacheSize;
+      this.attemptDirCache =
+          new LinkedHashMap<ApplicationAttemptId, Path>(
+              attemptDirCacheSize, 0.75f, true) {
+            private static final long serialVersionUID = 1L;
+            @Override
+            protected boolean removeEldestEntry(
+                Map.Entry<ApplicationAttemptId, Path> eldest) {
+              return size() > attemptDirCacheSize;
+            }
+          };
+      this.fs = fs;
+      this.activePath = activePath;
+    }
+
+    public Path getAppAttemptDir(ApplicationAttemptId attemptId)
+        throws IOException {
+      Path attemptDir = this.attemptDirCache.get(attemptId);
+      if (attemptDir == null) {
+        synchronized(this) {
+          attemptDir = this.attemptDirCache.get(attemptId);
+          if (attemptDir == null) {
+            attemptDir = createAttemptDir(attemptId);
+            attemptDirCache.put(attemptId, attemptDir);
+          }
+        }
+      }
+      return attemptDir;
+    }
+
+    private Path createAttemptDir(ApplicationAttemptId appAttemptId)
+        throws IOException {
+      Path appDir = createApplicationDir(appAttemptId.getApplicationId());
+
+      Path attemptDir = new Path(appDir, appAttemptId.toString());
+      if (!fs.exists(attemptDir)) {
+        FileSystem.mkdirs(fs, attemptDir, new FsPermission(
+            APP_LOG_DIR_PERMISSIONS));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("New attempt directory created - " + attemptDir);
+        }
+      }
+      return attemptDir;
+    }
+
+    private Path createApplicationDir(ApplicationId appId) throws IOException {
+      Path appDir =
+          new Path(activePath, appId.toString());
+      if (!fs.exists(appDir)) {
+        FileSystem.mkdirs(fs, appDir,
+            new FsPermission(APP_LOG_DIR_PERMISSIONS));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("New app directory created - " + appDir);
+        }
+      }
+      return appDir;
+    }
+  }
 }