You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/02/24 22:43:36 UTC

hadoop git commit: YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via gtcarrera9)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2e76c2f75 -> 9e0f7b8b6


YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via
gtcarrera9)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9e0f7b8b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9e0f7b8b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9e0f7b8b

Branch: refs/heads/trunk
Commit: 9e0f7b8b69ead629f999aa86c8fb7eb581e175d8
Parents: 2e76c2f
Author: Li Lu <gt...@apache.org>
Authored: Wed Feb 24 13:43:09 2016 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Wed Feb 24 13:43:09 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   6 +
 .../api/impl/FileSystemTimelineWriter.java      | 159 ++++++++++++++++---
 .../src/main/resources/yarn-default.xml         |  39 +++++
 4 files changed, 180 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0f7b8b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 94bbcbe..9aa1037 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -39,6 +39,8 @@ Trunk - Unreleased
 
   BUG FIXES
 
+    YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via gtcarrera9)
+
     YARN-524 TestYarnVersionInfo failing if generated properties doesn't
     include an SVN URL. (stevel)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0f7b8b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3845987..f0c7e6d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1729,6 +1729,12 @@ public class YarnConfiguration extends Configuration {
   public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT =
       5*60;
 
+  public static final String
+      TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS =
+      TIMELINE_SERVICE_CLIENT_PREFIX + "internal-timers-ttl-secs";
+  public static final long
+      TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60;
+
   // mark app-history related configs @Private as application history is going
   // to be integrated into the timeline service
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0f7b8b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index 1c295e1..aa1f1f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -32,6 +32,9 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.Map.Entry;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +47,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@@ -154,8 +158,14 @@ public class FileSystemTimelineWriter extends TimelineWriter{
         YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS,
         YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT);
 
+    long timerTaskTTL = conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS,
+        YarnConfiguration
+            .TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT);
+
     logFDsCache =
-        new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl);
+        new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl,
+            timerTaskTTL);
 
     this.isAppendSupported =
         conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
@@ -308,7 +318,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
     public void writeDomain(TimelineDomain domain)
         throws IOException {
       getObjectMapper().writeValue(getJsonGenerator(), domain);
-      updateLastModifiedTime(System.currentTimeMillis());
+      updateLastModifiedTime(Time.monotonicNow());
     }
   }
 
@@ -326,7 +336,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
       for (TimelineEntity entity : entities) {
         getObjectMapper().writeValue(getJsonGenerator(), entity);
       }
-      updateLastModifiedTime(System.currentTimeMillis());
+      updateLastModifiedTime(Time.monotonicNow());
     }
   }
 
@@ -372,7 +382,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
       this.stream = createLogFileStream(fs, logPath);
       this.jsonGenerator = new JsonFactory().createJsonGenerator(stream);
       this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
-      this.lastModifiedTime = System.currentTimeMillis();
+      this.lastModifiedTime = Time.monotonicNow();
     }
 
     protected boolean writerClosed() {
@@ -386,7 +396,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
       if (!isAppendSupported) {
         logPathToCreate =
             new Path(logPathToCreate.getParent(),
-              (logPathToCreate.getName() + "_" + System.currentTimeMillis()));
+              (logPathToCreate.getName() + "_" + Time.monotonicNow()));
       }
       if (!fileSystem.exists(logPathToCreate)) {
         streamToCreate = fileSystem.create(logPathToCreate, false);
@@ -424,10 +434,9 @@ public class FileSystemTimelineWriter extends TimelineWriter{
     private Map<ApplicationAttemptId, EntityLogFD> summanyLogFDs;
     private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
         EntityLogFD>> entityLogFDs;
-    private Timer flushTimer;
-    private FlushTimerTask flushTimerTask;
-    private Timer cleanInActiveFDsTimer;
-    private CleanInActiveFDsTask cleanInActiveFDsTask;
+    private Timer flushTimer = null;
+    private Timer cleanInActiveFDsTimer = null;
+    private Timer monitorTaskTimer = null;
     private final long ttl;
     private final ReentrantLock domainFDLocker = new ReentrantLock();
     private final ReentrantLock summaryTableLocker = new ReentrantLock();
@@ -435,27 +444,40 @@ public class FileSystemTimelineWriter extends TimelineWriter{
     private final ReentrantLock summaryTableCopyLocker = new ReentrantLock();
     private final ReentrantLock entityTableCopyLocker = new ReentrantLock();
     private volatile boolean serviceStopped = false;
+    private volatile boolean timerTaskStarted = false;
+    private final ReentrantLock timerTaskLocker = new ReentrantLock();
+    private final long flushIntervalSecs;
+    private final long cleanIntervalSecs;
+    private final long timerTaskRetainTTL;
+    private volatile long timeStampOfLastWrite = Time.monotonicNow();
+    private final ReadLock timerTasksMonitorReadLock;
+    private final WriteLock timerTasksMonitorWriteLock;
 
     public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
-        long ttl) {
+        long ttl, long timerTaskRetainTTL) {
       domainLogFD = null;
       summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
       entityLogFDs = new HashMap<ApplicationAttemptId,
           HashMap<TimelineEntityGroupId, EntityLogFD>>();
-      this.flushTimer =
-          new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
-            true);
-      this.flushTimerTask = new FlushTimerTask();
-      this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000,
-          flushIntervalSecs * 1000);
-
-      this.cleanInActiveFDsTimer =
-          new Timer(LogFDsCache.class.getSimpleName() +
-            "cleanInActiveFDsTimer", true);
-      this.cleanInActiveFDsTask = new CleanInActiveFDsTask();
-      this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask,
-          cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
       this.ttl = ttl * 1000;
+      this.flushIntervalSecs = flushIntervalSecs;
+      this.cleanIntervalSecs = cleanIntervalSecs;
+      long timerTaskRetainTTLVar = timerTaskRetainTTL * 1000;
+      if (timerTaskRetainTTLVar > this.ttl) {
+        this.timerTaskRetainTTL = timerTaskRetainTTLVar;
+      } else {
+        this.timerTaskRetainTTL = this.ttl + 2 * 60 * 1000;
+        LOG.warn("The specific " + YarnConfiguration
+            .TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS + " : "
+            + timerTaskRetainTTL + " is invalid, because it is less than or "
+            + "equal to " + YarnConfiguration
+            .TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " + ttl + ". Use "
+            + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : "
+            + ttl + " + 120s instead.");
+      }
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      this.timerTasksMonitorReadLock = lock.readLock();
+      this.timerTasksMonitorWriteLock = lock.writeLock();
     }
 
     @Override
@@ -548,7 +570,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
     }
 
     private void cleanInActiveFDs() {
-      long currentTimeStamp = System.currentTimeMillis();
+      long currentTimeStamp = Time.monotonicNow();
       try {
         this.domainFDLocker.lock();
         if (domainLogFD != null) {
@@ -623,13 +645,55 @@ public class FileSystemTimelineWriter extends TimelineWriter{
       }
     }
 
+    private class TimerMonitorTask extends TimerTask {
+      @Override
+      public void run() {
+        try {
+          timerTasksMonitorWriteLock.lock();
+          monitorTimerTasks();
+        } finally {
+          timerTasksMonitorWriteLock.unlock();
+        }
+      }
+    }
+
+    private void monitorTimerTasks() {
+      if (Time.monotonicNow() - this.timeStampOfLastWrite
+          >= this.timerTaskRetainTTL) {
+        cancelAndCloseTimerTasks();
+
+        timerTaskStarted = false;
+      } else {
+        if (this.monitorTaskTimer != null) {
+          this.monitorTaskTimer.schedule(new TimerMonitorTask(),
+              this.timerTaskRetainTTL);
+        }
+      }
+    }
+
     @Override
     public void close() throws IOException {
 
       serviceStopped = true;
 
-      flushTimer.cancel();
-      cleanInActiveFDsTimer.cancel();
+      cancelAndCloseTimerTasks();
+    }
+
+    private void cancelAndCloseTimerTasks() {
+      if (flushTimer != null) {
+        flushTimer.cancel();
+        flushTimer = null;
+      }
+
+      if (cleanInActiveFDsTimer != null) {
+        cleanInActiveFDsTimer.cancel();
+        cleanInActiveFDsTimer = null;
+      }
+
+      if (monitorTaskTimer != null) {
+        monitorTaskTimer.cancel();
+        monitorTaskTimer = null;
+      }
 
       try {
         this.domainFDLocker.lock();
@@ -696,6 +760,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
     public void writeDomainLog(FileSystem fs, Path logPath,
         ObjectMapper objMapper, TimelineDomain domain,
         boolean isAppendSupported) throws IOException {
+      checkAndStartTimeTasks();
       try {
         this.domainFDLocker.lock();
         if (this.domainLogFD != null) {
@@ -714,6 +779,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
         ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
         TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity,
         boolean isAppendSupported) throws IOException{
+      checkAndStartTimeTasks();
       writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
           groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
     }
@@ -788,6 +854,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
         ObjectMapper objMapper, ApplicationAttemptId attemptId,
         List<TimelineEntity> entities, boolean isAppendSupported)
         throws IOException {
+      checkAndStartTimeTasks();
       writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities,
           isAppendSupported, this.summanyLogFDs);
     }
@@ -843,5 +910,45 @@ public class FileSystemTimelineWriter extends TimelineWriter{
         summaryTableLocker.unlock();
       }
     }
+
+    private void createAndStartTimerTasks() {
+      this.flushTimer =
+          new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
+              true);
+      this.flushTimer.schedule(new FlushTimerTask(), flushIntervalSecs * 1000,
+          flushIntervalSecs * 1000);
+
+      this.cleanInActiveFDsTimer =
+          new Timer(LogFDsCache.class.getSimpleName()
+              + "cleanInActiveFDsTimer", true);
+      this.cleanInActiveFDsTimer.schedule(new CleanInActiveFDsTask(),
+          cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
+
+      this.monitorTaskTimer =
+          new Timer(LogFDsCache.class.getSimpleName() + "MonitorTimer",
+              true);
+      this.monitorTaskTimer.schedule(new TimerMonitorTask(),
+          this.timerTaskRetainTTL);
+    }
+
+    private void checkAndStartTimeTasks() {
+      try {
+        this.timerTasksMonitorReadLock.lock();
+        this.timeStampOfLastWrite = Time.monotonicNow();
+        if(!timerTaskStarted) {
+          try {
+            timerTaskLocker.lock();
+            if (!timerTaskStarted) {
+              createAndStartTimerTasks();
+              timerTaskStarted = true;
+            }
+          } finally {
+            timerTaskLocker.unlock();
+          }
+        }
+      } finally {
+        this.timerTasksMonitorReadLock.unlock();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0f7b8b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1d410f1..cd4074a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2050,6 +2050,45 @@
     <value>10485760</value>
   </property>
 
+  <property>
+    <name>yarn.timeline-service.client.fd-flush-interval-secs</name>
+    <description>
+      Flush interval for ATS v1.5 writer. This value controls how frequent
+      the writer will flush the HDFS FSStream for the entity/domain.
+    </description>
+    <value>10</value>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.client.fd-clean-interval-secs</name>
+    <description>
+      Scan interval for ATS v1.5 writer. This value controls how frequent
+      the writer will scan the HDFS FSStream for the entity/domain.
+      If the FSStream is stale for a long time, this FSStream will be close.
+    </description>
+    <value>60</value>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.client.fd-retain-secs</name>
+    <description>
+      How long the ATS v1.5 writer will keep a FSStream open.
+      If this fsstream does not write anything for this configured time,
+      it will be close.
+    </description>
+    <value>300</value>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.client.internal-timers-ttl-secs</name>
+    <description>
+      How long the internal Timer Tasks can be alive in writer. If there is no
+      write operation for this configured time, the internal timer tasks will
+      be close.
+    </description>
+    <value>420</value>
+  </property>
+
   <!--  Shared Cache Configuration -->
 
   <property>