You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/05/02 19:22:16 UTC

[41/50] [abbrv] hadoop git commit: MAPREDUCE-7042. Killed MR job data does not move to mapreduce.jobhistory.done-dir when ATS v2 is enabled. Contributed by Rohith Sharma K S.

MAPREDUCE-7042. Killed MR job data does not move to mapreduce.jobhistory.done-dir when ATS v2 is enabled. Contributed by Rohith Sharma K S.

(cherry picked from commit 83e60cd2db20f655e272958ef43b1b5a084ef3e3)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/600f4d40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/600f4d40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/600f4d40

Branch: refs/heads/YARN-8200
Commit: 600f4d402f1926c8dac678689844d0901504f803
Parents: a91d5c7
Author: Sunil G <su...@apache.org>
Authored: Thu Apr 26 19:07:02 2018 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Fri Apr 27 12:00:00 2018 +0530

----------------------------------------------------------------------
 .../jobhistory/JobHistoryEventHandler.java      | 66 +++++++++++++++++---
 .../jobhistory/TestJobHistoryEventHandler.java  | 32 +++++++++-
 2 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/600f4d40/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 4529d55..9d2b3be 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -119,7 +120,11 @@ public class JobHistoryEventHandler extends AbstractService
 
   protected BlockingQueue<JobHistoryEvent> eventQueue =
     new LinkedBlockingQueue<JobHistoryEvent>();
+
+  protected boolean handleTimelineEvent = false;
+  protected AsyncDispatcher atsEventDispatcher = null;
   protected Thread eventHandlingThread;
+
   private volatile boolean stopped;
   private final Object lock = new Object();
 
@@ -279,6 +284,7 @@ public class JobHistoryEventHandler extends AbstractService
               ((MRAppMaster.RunningAppContext) context).getTimelineClient();
           timelineClient.init(conf);
         }
+        handleTimelineEvent = true;
         LOG.info("Timeline service is enabled; version: " +
             YarnConfiguration.getTimelineServiceVersion(conf));
       } else {
@@ -302,10 +308,23 @@ public class JobHistoryEventHandler extends AbstractService
           "'json' or 'binary'.  Falling back to default value '" +
           JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'.");
     }
-
+    // initiate the atsEventDispatcher for timeline event
+    // if timeline service is enabled.
+    if (handleTimelineEvent) {
+      atsEventDispatcher = createDispatcher();
+      EventHandler<JobHistoryEvent> timelineEventHandler =
+          new ForwardingEventHandler();
+      atsEventDispatcher.register(EventType.class, timelineEventHandler);
+      atsEventDispatcher.setDrainEventsOnStop();
+      atsEventDispatcher.init(conf);
+    }
     super.serviceInit(conf);
   }
 
+  protected AsyncDispatcher createDispatcher() {
+    return new AsyncDispatcher("Job ATS Event Dispatcher");
+  }
+
   private void mkdir(FileSystem fs, Path path, FsPermission fsp)
       throws IOException {
     if (!fs.exists(path)) {
@@ -371,6 +390,10 @@ public class JobHistoryEventHandler extends AbstractService
         }
     }, "eventHandlingThread");
     eventHandlingThread.start();
+
+    if (handleTimelineEvent) {
+      atsEventDispatcher.start();
+    }
     super.serviceStart();
   }
 
@@ -453,6 +476,11 @@ public class JobHistoryEventHandler extends AbstractService
         LOG.info("Exception while closing file " + e.getMessage());
       }
     }
+
+    if (handleTimelineEvent && atsEventDispatcher != null) {
+      atsEventDispatcher.stop();
+    }
+
     if (timelineClient != null) {
       timelineClient.stop();
     } else if (timelineV2Client != null) {
@@ -572,6 +600,10 @@ public class JobHistoryEventHandler extends AbstractService
       }
 
       eventQueue.put(event);
+      // Process it for ATS (if enabled)
+      if (handleTimelineEvent) {
+        atsEventDispatcher.getEventHandler().handle(event);
+      }
     } catch (InterruptedException e) {
       throw new YarnRuntimeException(e);
     }
@@ -614,13 +646,6 @@ public class JobHistoryEventHandler extends AbstractService
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
-        if (timelineV2Client != null) {
-          processEventForNewTimelineService(historyEvent, event.getJobID(),
-              event.getTimestamp());
-        } else if (timelineClient != null) {
-          processEventForTimelineServer(historyEvent, event.getJobID(),
-              event.getTimestamp());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
               + event.getHistoryEvent().getEventType());
@@ -702,6 +727,23 @@ public class JobHistoryEventHandler extends AbstractService
     }
   }
 
+  private void handleTimelineEvent(JobHistoryEvent event) {
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    if (handleTimelineEvent) {
+      if (timelineV2Client != null) {
+        processEventForNewTimelineService(historyEvent, event.getJobID(),
+            event.getTimestamp());
+      } else if (timelineClient != null) {
+        processEventForTimelineServer(historyEvent, event.getJobID(),
+            event.getTimestamp());
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("In HistoryEventHandler, handle timelineEvent:"
+          + event.getHistoryEvent().getEventType());
+    }
+  }
+
   public void processEventForJobSummary(HistoryEvent event, JobSummary summary, 
       JobId jobId) {
     // context.getJob could be used for some of this info as well.
@@ -1708,4 +1750,12 @@ public class JobHistoryEventHandler extends AbstractService
   boolean getFlushTimerStatus() {
     return isTimerActive;
   }
+
+  private final class ForwardingEventHandler
+      implements EventHandler<JobHistoryEvent> {
+    @Override
+    public void handle(JobHistoryEvent event) {
+      handleTimelineEvent(event);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/600f4d40/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 51ac2ce..3fecef7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
@@ -586,6 +588,7 @@ public class TestJobHistoryEventHandler {
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
               t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1),
               currentTime - 10));
+      jheh.getDispatcher().await();
       TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
               null, null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -602,6 +605,7 @@ public class TestJobHistoryEventHandler {
               "user", 200, "/foo/job.xml",
               new HashMap<JobACL, AccessControlList>(), "default"),
               currentTime + 10));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -620,6 +624,7 @@ public class TestJobHistoryEventHandler {
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
               new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
               currentTime - 20));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -642,6 +647,7 @@ public class TestJobHistoryEventHandler {
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
               new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
               0, new Counters(), new Counters(), new Counters()), currentTime));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -667,7 +673,9 @@ public class TestJobHistoryEventHandler {
 
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
             new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
-            0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
+            0, 0, 0, JobStateInternal.KILLED.toString()),
+            currentTime + 20));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -697,6 +705,7 @@ public class TestJobHistoryEventHandler {
 
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
             new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -710,6 +719,7 @@ public class TestJobHistoryEventHandler {
 
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
             new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -1027,6 +1037,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
 
   private EventWriter eventWriter;
   private boolean mockHistoryProcessing = true;
+  private DrainDispatcher dispatcher;
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
     JobHistoryEventHandler.fileMap.clear();
@@ -1039,12 +1050,31 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
   }
 
   @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+
+  }
+
+  @Override
   protected void serviceStart() {
     if (timelineClient != null) {
       timelineClient.start();
     } else if (timelineV2Client != null) {
       timelineV2Client.start();
     }
+    if (handleTimelineEvent) {
+      atsEventDispatcher.start();
+    }
+  }
+
+  @Override
+  protected AsyncDispatcher createDispatcher() {
+    dispatcher = new DrainDispatcher();
+    return dispatcher;
+  }
+
+  public DrainDispatcher getDispatcher() {
+    return dispatcher;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org