You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/07/25 19:30:52 UTC

hadoop git commit: YARN-3949. Ensure timely flush of timeline writes. Contributed by Sangjin Lee.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 eb1932dda -> 967bef7e0


YARN-3949. Ensure timely flush of timeline writes. Contributed by Sangjin Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/967bef7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/967bef7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/967bef7e

Branch: refs/heads/YARN-2928
Commit: 967bef7e0396d857913caa2574afb103a5f0b81b
Parents: eb1932d
Author: Junping Du <ju...@apache.org>
Authored: Sat Jul 25 10:30:29 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Sat Jul 25 10:30:29 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  9 +++
 .../src/main/resources/yarn-default.xml         | 17 +++++-
 .../collector/TimelineCollectorManager.java     | 64 ++++++++++++++++++--
 .../storage/FileSystemTimelineWriterImpl.java   |  5 ++
 .../storage/HBaseTimelineWriterImpl.java        |  6 ++
 .../storage/PhoenixTimelineWriterImpl.java      |  5 ++
 .../timelineservice/storage/TimelineWriter.java |  9 +++
 .../TestNMTimelineCollectorManager.java         |  5 ++
 9 files changed, 118 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4b67443..98006f2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -79,6 +79,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3047. [Data Serving] Set up ATS reader with basic request serving
     structure and lifecycle (Varun Saxena via sjlee)
 
+    YARN-3949. Ensure timely flush of timeline writes. (Sangjin Lee via
+    junping_du)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8b14ef7..a108d6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1445,6 +1445,15 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_READER_CLASS =
       TIMELINE_SERVICE_PREFIX + "reader.class";
 
+  /** The setting that controls how often the timeline collector flushes the
+   * timeline writer.
+   */
+  public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS =
+      TIMELINE_SERVICE_PREFIX + "writer.flush-interval-seconds";
+
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
+
   // mark app-history related configs @Private as application history is going
   // to be integrated into the timeline service
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 7bdf188..e82a065 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -733,7 +733,15 @@
     <name>yarn.system-metrics-publisher.enabled</name>
     <value>false</value>
   </property>
- 
+
+  <property>
+    <description>The setting that controls whether yarn container metrics is
+    published to the timeline server or not by RM. This configuration setting is
+    for ATS V2.</description>
+    <name>yarn.rm.system-metrics-publisher.emit-container-events</name>
+    <value>false</value>
+  </property>
+
 
   <property>
     <description>Number of worker threads that send the yarn system metrics
@@ -1842,6 +1850,13 @@
     <value>${hadoop.tmp.dir}/yarn/timeline</value>
   </property>
 
+  <property>
+    <description>The setting that controls how often the timeline collector
+    flushes the timeline writer.</description>
+    <name>yarn.timeline-service.writer.flush-interval-seconds</name>
+    <value>60</value>
+  </property>
+
   <!--  Shared Cache Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 4de39ec..d85548e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -19,6 +19,13 @@
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -32,9 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Class that manages adding and removing collectors and their lifecycle. It
@@ -46,8 +51,10 @@ import java.util.Map;
 public class TimelineCollectorManager extends AbstractService {
   private static final Log LOG =
       LogFactory.getLog(TimelineCollectorManager.class);
-
   private TimelineWriter writer;
+  private ScheduledExecutorService writerFlusher;
+  private int flushInterval;
+  private boolean writerFlusherRunning;
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
@@ -56,6 +63,12 @@ public class TimelineCollectorManager extends AbstractService {
         FileSystemTimelineWriterImpl.class,
         TimelineWriter.class), conf);
     writer.init(conf);
+    // create a single dedicated thread for flushing the writer on a periodic
+    // basis
+    writerFlusher = Executors.newSingleThreadScheduledExecutor();
+    flushInterval = conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
     super.serviceInit(conf);
   }
 
@@ -65,6 +78,10 @@ public class TimelineCollectorManager extends AbstractService {
     if (writer != null) {
       writer.start();
     }
+    // schedule the flush task
+    writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer),
+        flushInterval, flushInterval, TimeUnit.SECONDS);
+    writerFlusherRunning = true;
   }
 
   // access to this map is synchronized with the map itself
@@ -165,9 +182,48 @@ public class TimelineCollectorManager extends AbstractService {
         c.serviceStop();
       }
     }
+    // stop the flusher first
+    if (writerFlusher != null) {
+      writerFlusher.shutdown();
+      writerFlusherRunning = false;
+      if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) {
+        // in reality it should be ample time for the flusher task to finish
+        // even if it times out, writers may be able to handle closing in this
+        // situation fine
+        // proceed to close the writer
+        LOG.warn("failed to stop the flusher task in time. " +
+            "will still proceed to close the writer.");
+      }
+    }
     if (writer != null) {
       writer.close();
     }
     super.serviceStop();
   }
+
+  @VisibleForTesting
+  boolean writerFlusherRunning() {
+    return writerFlusherRunning;
+  }
+
+  /**
+   * Task that invokes the flush operation on the timeline writer.
+   */
+  private static class WriterFlushTask implements Runnable {
+    private final TimelineWriter writer;
+
+    public WriterFlushTask(TimelineWriter writer) {
+      this.writer = writer;
+    }
+
+    public void run() {
+      try {
+        writer.flush();
+      } catch (Throwable th) {
+        // we need to handle all exceptions or subsequent execution may be
+        // suppressed
+        LOG.error("exception during timeline writer flush!", th);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 2fff98d..4385bbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -129,6 +129,11 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     mkdirs(outputRoot, ENTITIES_DIR);
   }
 
+  @Override
+  public void flush() throws IOException {
+    // no op
+  }
+
   private static String mkdirs(String... dirStrs) throws IOException {
     StringBuilder path = new StringBuilder();
     for (String dirStr : dirStrs) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index e48ca60..876ad6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -214,6 +214,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     return null;
   }
 
+  @Override
+  public void flush() throws IOException {
+    // flush all buffered mutators
+    entityTable.flush();
+  }
+
   /**
    * close the hbase connections The close APIs perform flushing and release any
    * resources held

http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
index 5b4442c..381ff17 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
@@ -187,6 +187,11 @@ public class PhoenixTimelineWriterImpl extends AbstractService
 
   }
 
+  @Override
+  public void flush() throws IOException {
+    // currently no-op
+  }
+
   // Utility functions
   @Private
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index 494e8ad..50136de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -70,4 +70,13 @@ public interface TimelineWriter extends Service {
    */
   TimelineWriteResponse aggregate(TimelineEntity data,
       TimelineAggregationTrack track) throws IOException;
+
+  /**
+   * Flushes the data to the backend storage. Whatever may be buffered will be
+   * written to the storage when the method returns. This may be a potentially
+   * time-consuming operation, and should be used judiciously.
+   *
+   * @throws IOException
+   */
+  void flush() throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
index 87343fd..0d69fbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
@@ -67,6 +67,11 @@ public class TestNMTimelineCollectorManager {
   }
 
   @Test
+  public void testStartingWriterFlusher() throws Exception {
+    assertTrue(collectorManager.writerFlusherRunning());
+  }
+
+  @Test
   public void testStartWebApp() throws Exception {
     assertNotNull(collectorManager.getRestServerBindAddress());
     String address = collectorManager.getRestServerBindAddress();