You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/05/02 19:22:16 UTC
[41/50] [abbrv] hadoop git commit: MAPREDUCE-7042. Killed MR job data
does not move to mapreduce.jobhistory.done-dir when ATS v2 is enabled.
Contributed by Rohith Sharma K S.
MAPREDUCE-7042. Killed MR job data does not move to mapreduce.jobhistory.done-dir when ATS v2 is enabled. Contributed by Rohith Sharma K S.
(cherry picked from commit 83e60cd2db20f655e272958ef43b1b5a084ef3e3)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/600f4d40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/600f4d40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/600f4d40
Branch: refs/heads/YARN-8200
Commit: 600f4d402f1926c8dac678689844d0901504f803
Parents: a91d5c7
Author: Sunil G <su...@apache.org>
Authored: Thu Apr 26 19:07:02 2018 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Fri Apr 27 12:00:00 2018 +0530
----------------------------------------------------------------------
.../jobhistory/JobHistoryEventHandler.java | 66 +++++++++++++++++---
.../jobhistory/TestJobHistoryEventHandler.java | 32 +++++++++-
2 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/600f4d40/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 4529d55..9d2b3be 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -119,7 +120,11 @@ public class JobHistoryEventHandler extends AbstractService
protected BlockingQueue<JobHistoryEvent> eventQueue =
new LinkedBlockingQueue<JobHistoryEvent>();
+
+ protected boolean handleTimelineEvent = false;
+ protected AsyncDispatcher atsEventDispatcher = null;
protected Thread eventHandlingThread;
+
private volatile boolean stopped;
private final Object lock = new Object();
@@ -279,6 +284,7 @@ public class JobHistoryEventHandler extends AbstractService
((MRAppMaster.RunningAppContext) context).getTimelineClient();
timelineClient.init(conf);
}
+ handleTimelineEvent = true;
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
} else {
@@ -302,10 +308,23 @@ public class JobHistoryEventHandler extends AbstractService
"'json' or 'binary'. Falling back to default value '" +
JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'.");
}
-
+ // initiate the atsEventDispatcher for timeline event
+ // if timeline service is enabled.
+ if (handleTimelineEvent) {
+ atsEventDispatcher = createDispatcher();
+ EventHandler<JobHistoryEvent> timelineEventHandler =
+ new ForwardingEventHandler();
+ atsEventDispatcher.register(EventType.class, timelineEventHandler);
+ atsEventDispatcher.setDrainEventsOnStop();
+ atsEventDispatcher.init(conf);
+ }
super.serviceInit(conf);
}
+ protected AsyncDispatcher createDispatcher() {
+ return new AsyncDispatcher("Job ATS Event Dispatcher");
+ }
+
private void mkdir(FileSystem fs, Path path, FsPermission fsp)
throws IOException {
if (!fs.exists(path)) {
@@ -371,6 +390,10 @@ public class JobHistoryEventHandler extends AbstractService
}
}, "eventHandlingThread");
eventHandlingThread.start();
+
+ if (handleTimelineEvent) {
+ atsEventDispatcher.start();
+ }
super.serviceStart();
}
@@ -453,6 +476,11 @@ public class JobHistoryEventHandler extends AbstractService
LOG.info("Exception while closing file " + e.getMessage());
}
}
+
+ if (handleTimelineEvent && atsEventDispatcher != null) {
+ atsEventDispatcher.stop();
+ }
+
if (timelineClient != null) {
timelineClient.stop();
} else if (timelineV2Client != null) {
@@ -572,6 +600,10 @@ public class JobHistoryEventHandler extends AbstractService
}
eventQueue.put(event);
+ // Process it for ATS (if enabled)
+ if (handleTimelineEvent) {
+ atsEventDispatcher.getEventHandler().handle(event);
+ }
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
@@ -614,13 +646,6 @@ public class JobHistoryEventHandler extends AbstractService
}
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
- if (timelineV2Client != null) {
- processEventForNewTimelineService(historyEvent, event.getJobID(),
- event.getTimestamp());
- } else if (timelineClient != null) {
- processEventForTimelineServer(historyEvent, event.getJobID(),
- event.getTimestamp());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler "
+ event.getHistoryEvent().getEventType());
@@ -702,6 +727,23 @@ public class JobHistoryEventHandler extends AbstractService
}
}
+ private void handleTimelineEvent(JobHistoryEvent event) {
+ HistoryEvent historyEvent = event.getHistoryEvent();
+ if (handleTimelineEvent) {
+ if (timelineV2Client != null) {
+ processEventForNewTimelineService(historyEvent, event.getJobID(),
+ event.getTimestamp());
+ } else if (timelineClient != null) {
+ processEventForTimelineServer(historyEvent, event.getJobID(),
+ event.getTimestamp());
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("In HistoryEventHandler, handle timelineEvent:"
+ + event.getHistoryEvent().getEventType());
+ }
+ }
+
public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
JobId jobId) {
// context.getJob could be used for some of this info as well.
@@ -1708,4 +1750,12 @@ public class JobHistoryEventHandler extends AbstractService
boolean getFlushTimerStatus() {
return isTimerActive;
}
+
+ private final class ForwardingEventHandler
+ implements EventHandler<JobHistoryEvent> {
+ @Override
+ public void handle(JobHistoryEvent event) {
+ handleTimelineEvent(event);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/600f4d40/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 51ac2ce..3fecef7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
@@ -586,6 +588,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1),
currentTime - 10));
+ jheh.getDispatcher().await();
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@@ -602,6 +605,7 @@ public class TestJobHistoryEventHandler {
"user", 200, "/foo/job.xml",
new HashMap<JobACL, AccessControlList>(), "default"),
currentTime + 10));
+ jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@@ -620,6 +624,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
currentTime - 20));
+ jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@@ -642,6 +647,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
0, new Counters(), new Counters(), new Counters()), currentTime));
+ jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@@ -667,7 +673,9 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
- 0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
+ 0, 0, 0, JobStateInternal.KILLED.toString()),
+ currentTime + 20));
+ jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@@ -697,6 +705,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
+ jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@@ -710,6 +719,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
+ jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@@ -1027,6 +1037,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
private EventWriter eventWriter;
private boolean mockHistoryProcessing = true;
+ private DrainDispatcher dispatcher;
public JHEvenHandlerForTest(AppContext context, int startCount) {
super(context, startCount);
JobHistoryEventHandler.fileMap.clear();
@@ -1039,12 +1050,31 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
}
@Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+
+ }
+
+ @Override
protected void serviceStart() {
if (timelineClient != null) {
timelineClient.start();
} else if (timelineV2Client != null) {
timelineV2Client.start();
}
+ if (handleTimelineEvent) {
+ atsEventDispatcher.start();
+ }
+ }
+
+ @Override
+ protected AsyncDispatcher createDispatcher() {
+ dispatcher = new DrainDispatcher();
+ return dispatcher;
+ }
+
+ public DrainDispatcher getDispatcher() {
+ return dispatcher;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org