You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2014/10/28 05:07:42 UTC

git commit: MAPREDUCE-6018. Added an MR specific config to enable emitting job history data to the timeline server. Contributed by Robert Kanter.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 6b2f11b54 -> 971e91c8c


MAPREDUCE-6018. Added an MR specific config to enable emitting job history data to the timeline server. Contributed by Robert Kanter.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/971e91c8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/971e91c8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/971e91c8

Branch: refs/heads/trunk
Commit: 971e91c8c03a23e4613ed3f071b4f982ee5a1b63
Parents: 6b2f11b
Author: Zhijie Shen <zj...@apache.org>
Authored: Mon Oct 27 21:03:53 2014 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Oct 27 21:03:53 2014 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../jobhistory/JobHistoryEventHandler.java      | 26 +++++--
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  5 ++
 .../src/main/resources/mapred-default.xml       |  8 ++
 .../mapred/TestMRTimelineEventHandling.java     | 80 ++++++++++++++++++++
 5 files changed, 114 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/971e91c8/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index fd82d49..9c2f16e 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -280,6 +280,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5970. Provide a boolean switch to enable MR-AM profiling (Gera
     Shegalov via jlowe)
 
+    MAPREDUCE-6018. Added an MR specific config to enable emitting job history
+    data to the timeline server. (Robert Kanter via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/971e91c8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index ebedc1b..184baaa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -241,8 +241,14 @@ public class JobHistoryEventHandler extends AbstractService
             MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
             MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
 
-    timelineClient = TimelineClient.createTimelineClient();
-    timelineClient.init(conf);
+    if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
+        MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
+      timelineClient = TimelineClient.createTimelineClient();
+      timelineClient.init(conf);
+      LOG.info("Emitting job history data to the timeline server is enabled");
+    } else {
+      LOG.info("Emitting job history data to the timeline server is not enabled");
+    }
 
     super.serviceInit(conf);
   }
@@ -268,7 +274,9 @@ public class JobHistoryEventHandler extends AbstractService
 
   @Override
   protected void serviceStart() throws Exception {
-    timelineClient.start();
+    if (timelineClient != null) {
+      timelineClient.start();
+    }
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -537,7 +545,7 @@ public class JobHistoryEventHandler extends AbstractService
       // For all events
       // (1) Write it out
       // (2) Process it for JobSummary
-      // (3) Process it for ATS
+      // (3) Process it for ATS (if enabled)
       MetaInfo mi = fileMap.get(event.getJobID());
       try {
         HistoryEvent historyEvent = event.getHistoryEvent();
@@ -546,8 +554,10 @@ public class JobHistoryEventHandler extends AbstractService
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
-        processEventForTimelineServer(historyEvent, event.getJobID(),
-                event.getTimestamp());
+        if (timelineClient != null) {
+          processEventForTimelineServer(historyEvent, event.getJobID(),
+              event.getTimestamp());
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
               + event.getHistoryEvent().getEventType());
@@ -839,8 +849,8 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
         tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
         tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
-                tfe2.getSuccessfulTaskAttemptId() == null ?
-                "" : tfe2.getSuccessfulTaskAttemptId().toString());
+            tfe2.getSuccessfulTaskAttemptId() == null ?
+            "" : tfe2.getSuccessfulTaskAttemptId().toString());
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(tfe2.getTaskId().toString());
         tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/971e91c8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 689e51c..5476e92 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -383,6 +383,11 @@ public interface MRJobConfig {
   public static final String JOB_UBERTASK_MAXBYTES =
     "mapreduce.job.ubertask.maxbytes";
 
+  public static final String MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
+    "mapreduce.job.emit-timeline-data";
+  public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
+      false;
+
   public static final String MR_PREFIX = "yarn.app.mapreduce.";
 
   public static final String MR_AM_PREFIX = MR_PREFIX + "am.";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/971e91c8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 1a86071..6be62ec 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -569,6 +569,14 @@
 </property>
 
 <property>
+    <name>mapreduce.job.emit-timeline-data</name>
+    <value>false</value>
+    <description>Specifies if the Application Master should emit timeline data
+    to the timeline server. Individual jobs can override this value.
+    </description>
+</property>
+
+<property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>
   <description>The minimum size chunk that map input should be split

http://git-wip-us.apache.org/repos/asf/hadoop/blob/971e91c8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index 2352818..7b8d6df 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
@@ -85,4 +86,83 @@ public class TestMRTimelineEventHandling {
     }
   }
 
+  @Test
+  public void testMapreduceJobTimelineServiceEnabled()
+      throws Exception {
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
+    MiniMRYarnCluster cluster = null;
+    try {
+      cluster = new MiniMRYarnCluster(
+          TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
+      cluster.init(conf);
+      cluster.start();
+      TimelineStore ts = cluster.getApplicationHistoryServer()
+          .getTimelineStore();
+
+      Path inDir = new Path("input");
+      Path outDir = new Path("output");
+      RunningJob job =
+          UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+      TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+          null, null, null, null, null, null);
+      Assert.assertEquals(0, entities.getEntities().size());
+
+      conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+      job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
+          null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      TimelineEntity tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+    } finally {
+      if (cluster != null) {
+        cluster.stop();
+      }
+    }
+
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+    cluster = null;
+    try {
+      cluster = new MiniMRYarnCluster(
+          TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
+      cluster.init(conf);
+      cluster.start();
+      TimelineStore ts = cluster.getApplicationHistoryServer()
+          .getTimelineStore();
+
+      Path inDir = new Path("input");
+      Path outDir = new Path("output");
+
+      conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
+      RunningJob job =
+          UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+      TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+          null, null, null, null, null, null);
+      Assert.assertEquals(0, entities.getEntities().size());
+
+      conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+      job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
+          null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      TimelineEntity tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+    } finally {
+      if (cluster != null) {
+        cluster.stop();
+      }
+    }
+  }
 }