You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/03/23 16:52:15 UTC
hadoop git commit: YARN-4814. ATS 1.5 timelineclient impl call flush
after every event write. Contributed by Xuan Gong. (cherry picked from commit
af1d125f9ce35ec69a610674a1c5c60cc17141a7) (cherry picked from commit
76602161c006ddc9cf2404ac762f127f107d14
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 f362b8a40 -> 762c7d436
YARN-4814. ATS 1.5 timelineclient impl call flush after every event write. Contributed by Xuan Gong.
(cherry picked from commit af1d125f9ce35ec69a610674a1c5c60cc17141a7)
(cherry picked from commit 76602161c006ddc9cf2404ac762f127f107d14cd)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/762c7d43
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/762c7d43
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/762c7d43
Branch: refs/heads/branch-2.8
Commit: 762c7d4361d8acf34aff46b2a01f38efb989c777
Parents: f362b8a
Author: Junping Du <ju...@apache.org>
Authored: Wed Mar 23 08:57:16 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Wed Mar 23 09:02:56 2016 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 6 ++
.../api/impl/FileSystemTimelineWriter.java | 104 ++++++++++++++-----
2 files changed, 86 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/762c7d43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5e1c6fa..8018d1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1679,6 +1679,12 @@ public class YarnConfiguration extends Configuration {
public static final long
TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60;
+ public static final String
+ TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE =
+ TIMELINE_SERVICE_CLIENT_PREFIX + "internal-attempt-dir-cache-size";
+ public static final int
+ DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE = 1000;
+
// This is temporary solution. The configuration will be deleted once we have
// the FileSystem API to check whether append operation is supported or not.
public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
http://git-wip-us.apache.org/repos/asf/hadoop/blob/762c7d43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index b471b3b..35d9970 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -26,6 +26,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -97,6 +98,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private long ttl;
private LogFDsCache logFDsCache = null;
private boolean isAppendSupported;
+ private final AttemptDirCache attemptDirCache;
public FileSystemTimelineWriter(Configuration conf,
UserGroupInformation authUgi, Client client, URI resURI)
@@ -158,6 +160,15 @@ public class FileSystemTimelineWriter extends TimelineWriter{
objMapper = createObjectMapper();
+ int attemptDirCacheSize = conf.getInt(
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE,
+ YarnConfiguration
+ .DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE);
+
+ attemptDirCache =
+ new AttemptDirCache(attemptDirCacheSize, fs, activePath);
+
if (LOG.isDebugEnabled()) {
StringBuilder debugMSG = new StringBuilder();
debugMSG.append(
@@ -199,7 +210,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
= new ArrayList<TimelineEntity>();
List<TimelineEntity> entitiesToEntityCache
= new ArrayList<TimelineEntity>();
- Path attemptDir = createAttemptDir(appAttemptId);
+ Path attemptDir = attemptDirCache.getAppAttemptDir(appAttemptId);
for (TimelineEntity entity : entities) {
if (summaryEntityTypes.contains(entity.getEntityType())) {
@@ -279,32 +290,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
return mapper;
}
- private Path createAttemptDir(ApplicationAttemptId appAttemptId)
- throws IOException {
- Path appDir = createApplicationDir(appAttemptId.getApplicationId());
-
- Path attemptDir = new Path(appDir, appAttemptId.toString());
- if (!fs.exists(attemptDir)) {
- FileSystem.mkdirs(fs, attemptDir, new FsPermission(
- APP_LOG_DIR_PERMISSIONS));
- }
- return attemptDir;
- }
-
- private Path createApplicationDir(ApplicationId appId) throws IOException {
- Path appDir =
- new Path(activePath, appId.toString());
- if (!fs.exists(appDir)) {
- FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
- }
- return appDir;
- }
-
private void writeDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException {
Path domainLogPath =
- new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
- + appAttemptId.toString());
+ new Path(attemptDirCache.getAppAttemptDir(appAttemptId),
+ DOMAIN_LOG_PREFIX + appAttemptId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Writing domains for " + appAttemptId.toString() + " to "
+ domainLogPath);
@@ -958,4 +948,70 @@ public class FileSystemTimelineWriter extends TimelineWriter{
}
}
}
+
+ private static class AttemptDirCache {
+ private final int attemptDirCacheSize;
+ private final Map<ApplicationAttemptId, Path> attemptDirCache;
+ private final FileSystem fs;
+ private final Path activePath;
+
+ public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) {
+ this.attemptDirCacheSize = cacheSize;
+ this.attemptDirCache =
+ new LinkedHashMap<ApplicationAttemptId, Path>(
+ attemptDirCacheSize, 0.75f, true) {
+ private static final long serialVersionUID = 1L;
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<ApplicationAttemptId, Path> eldest) {
+ return size() > attemptDirCacheSize;
+ }
+ };
+ this.fs = fs;
+ this.activePath = activePath;
+ }
+
+ public Path getAppAttemptDir(ApplicationAttemptId attemptId)
+ throws IOException {
+ Path attemptDir = this.attemptDirCache.get(attemptId);
+ if (attemptDir == null) {
+ synchronized(this) {
+ attemptDir = this.attemptDirCache.get(attemptId);
+ if (attemptDir == null) {
+ attemptDir = createAttemptDir(attemptId);
+ attemptDirCache.put(attemptId, attemptDir);
+ }
+ }
+ }
+ return attemptDir;
+ }
+
+ private Path createAttemptDir(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ Path appDir = createApplicationDir(appAttemptId.getApplicationId());
+
+ Path attemptDir = new Path(appDir, appAttemptId.toString());
+ if (!fs.exists(attemptDir)) {
+ FileSystem.mkdirs(fs, attemptDir, new FsPermission(
+ APP_LOG_DIR_PERMISSIONS));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New attempt directory created - " + attemptDir);
+ }
+ }
+ return attemptDir;
+ }
+
+ private Path createApplicationDir(ApplicationId appId) throws IOException {
+ Path appDir =
+ new Path(activePath, appId.toString());
+ if (!fs.exists(appDir)) {
+ FileSystem.mkdirs(fs, appDir,
+ new FsPermission(APP_LOG_DIR_PERMISSIONS));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New app directory created - " + appDir);
+ }
+ }
+ return appDir;
+ }
+ }
}