You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/07/25 19:30:52 UTC
hadoop git commit: YARN-3949. Ensure timely flush of timeline writes.
Contributed by Sangjin Lee.
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 eb1932dda -> 967bef7e0
YARN-3949. Ensure timely flush of timeline writes. Contributed by Sangjin Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/967bef7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/967bef7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/967bef7e
Branch: refs/heads/YARN-2928
Commit: 967bef7e0396d857913caa2574afb103a5f0b81b
Parents: eb1932d
Author: Junping Du <ju...@apache.org>
Authored: Sat Jul 25 10:30:29 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Sat Jul 25 10:30:29 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 9 +++
.../src/main/resources/yarn-default.xml | 17 +++++-
.../collector/TimelineCollectorManager.java | 64 ++++++++++++++++++--
.../storage/FileSystemTimelineWriterImpl.java | 5 ++
.../storage/HBaseTimelineWriterImpl.java | 6 ++
.../storage/PhoenixTimelineWriterImpl.java | 5 ++
.../timelineservice/storage/TimelineWriter.java | 9 +++
.../TestNMTimelineCollectorManager.java | 5 ++
9 files changed, 118 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4b67443..98006f2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -79,6 +79,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3047. [Data Serving] Set up ATS reader with basic request serving
structure and lifecycle (Varun Saxena via sjlee)
+ YARN-3949. Ensure timely flush of timeline writes. (Sangjin Lee via
+ junping_du)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8b14ef7..a108d6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1445,6 +1445,15 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_READER_CLASS =
TIMELINE_SERVICE_PREFIX + "reader.class";
+ /** The setting that controls how often the timeline collector flushes the
+ * timeline writer.
+ */
+ public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS =
+ TIMELINE_SERVICE_PREFIX + "writer.flush-interval-seconds";
+
+ public static final int
+ DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
+
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 7bdf188..e82a065 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -733,7 +733,15 @@
<name>yarn.system-metrics-publisher.enabled</name>
<value>false</value>
</property>
-
+
+ <property>
+ <description>The setting that controls whether yarn container metrics is
+ published to the timeline server or not by RM. This configuration setting is
+ for ATS V2.</description>
+ <name>yarn.rm.system-metrics-publisher.emit-container-events</name>
+ <value>false</value>
+ </property>
+
<property>
<description>Number of worker threads that send the yarn system metrics
@@ -1842,6 +1850,13 @@
<value>${hadoop.tmp.dir}/yarn/timeline</value>
</property>
+ <property>
+ <description>The setting that controls how often the timeline collector
+ flushes the timeline writer.</description>
+ <name>yarn.timeline-service.writer.flush-interval-seconds</name>
+ <value>60</value>
+ </property>
+
<!-- Shared Cache Configuration -->
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 4de39ec..d85548e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -19,6 +19,13 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -32,9 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
/**
* Class that manages adding and removing collectors and their lifecycle. It
@@ -46,8 +51,10 @@ import java.util.Map;
public class TimelineCollectorManager extends AbstractService {
private static final Log LOG =
LogFactory.getLog(TimelineCollectorManager.class);
-
private TimelineWriter writer;
+ private ScheduledExecutorService writerFlusher;
+ private int flushInterval;
+ private boolean writerFlusherRunning;
@Override
public void serviceInit(Configuration conf) throws Exception {
@@ -56,6 +63,12 @@ public class TimelineCollectorManager extends AbstractService {
FileSystemTimelineWriterImpl.class,
TimelineWriter.class), conf);
writer.init(conf);
+ // create a single dedicated thread for flushing the writer on a periodic
+ // basis
+ writerFlusher = Executors.newSingleThreadScheduledExecutor();
+ flushInterval = conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
super.serviceInit(conf);
}
@@ -65,6 +78,10 @@ public class TimelineCollectorManager extends AbstractService {
if (writer != null) {
writer.start();
}
+ // schedule the flush task
+ writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer),
+ flushInterval, flushInterval, TimeUnit.SECONDS);
+ writerFlusherRunning = true;
}
// access to this map is synchronized with the map itself
@@ -165,9 +182,48 @@ public class TimelineCollectorManager extends AbstractService {
c.serviceStop();
}
}
+ // stop the flusher first
+ if (writerFlusher != null) {
+ writerFlusher.shutdown();
+ writerFlusherRunning = false;
+ if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) {
+ // in reality it should be ample time for the flusher task to finish
+ // even if it times out, writers may be able to handle closing in this
+ // situation fine
+ // proceed to close the writer
+ LOG.warn("failed to stop the flusher task in time. " +
+ "will still proceed to close the writer.");
+ }
+ }
if (writer != null) {
writer.close();
}
super.serviceStop();
}
+
+ @VisibleForTesting
+ boolean writerFlusherRunning() {
+ return writerFlusherRunning;
+ }
+
+ /**
+ * Task that invokes the flush operation on the timeline writer.
+ */
+ private static class WriterFlushTask implements Runnable {
+ private final TimelineWriter writer;
+
+ public WriterFlushTask(TimelineWriter writer) {
+ this.writer = writer;
+ }
+
+ public void run() {
+ try {
+ writer.flush();
+ } catch (Throwable th) {
+ // we need to handle all exceptions or subsequent execution may be
+ // suppressed
+ LOG.error("exception during timeline writer flush!", th);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 2fff98d..4385bbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -129,6 +129,11 @@ public class FileSystemTimelineWriterImpl extends AbstractService
mkdirs(outputRoot, ENTITIES_DIR);
}
+ @Override
+ public void flush() throws IOException {
+ // no op
+ }
+
private static String mkdirs(String... dirStrs) throws IOException {
StringBuilder path = new StringBuilder();
for (String dirStr : dirStrs) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index e48ca60..876ad6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -214,6 +214,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
return null;
}
+ @Override
+ public void flush() throws IOException {
+ // flush all buffered mutators
+ entityTable.flush();
+ }
+
/**
* close the hbase connections The close APIs perform flushing and release any
* resources held
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
index 5b4442c..381ff17 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
@@ -187,6 +187,11 @@ public class PhoenixTimelineWriterImpl extends AbstractService
}
+ @Override
+ public void flush() throws IOException {
+ // currently no-op
+ }
+
// Utility functions
@Private
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index 494e8ad..50136de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -70,4 +70,13 @@ public interface TimelineWriter extends Service {
*/
TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException;
+
+ /**
+ * Flushes the data to the backend storage. Whatever may be buffered will be
+ * written to the storage when the method returns. This may be a potentially
+ * time-consuming operation, and should be used judiciously.
+ *
+ * @throws IOException
+ */
+ void flush() throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/967bef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
index 87343fd..0d69fbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
@@ -67,6 +67,11 @@ public class TestNMTimelineCollectorManager {
}
@Test
+ public void testStartingWriterFlusher() throws Exception {
+ assertTrue(collectorManager.writerFlusherRunning());
+ }
+
+ @Test
public void testStartWebApp() throws Exception {
assertNotNull(collectorManager.getRestServerBindAddress());
String address = collectorManager.getRestServerBindAddress();