You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:49:01 UTC
[15/50] [abbrv] hadoop git commit: MAPREDUCE-6688. Store job
configurations in Timeline Service v2 (Varun Saxena via sjlee)
MAPREDUCE-6688. Store job configurations in Timeline Service v2 (Varun Saxena via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91a9099c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91a9099c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91a9099c
Branch: refs/heads/YARN-2928
Commit: 91a9099cdaa2f08f03da766b24ead29dc6907891
Parents: 8c00fef
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue May 3 09:19:36 2016 -0700
Committer: Vrushali <vr...@twitter.com>
Committed: Sun Jun 19 00:20:08 2016 -0700
----------------------------------------------------------------------
.../jobhistory/JobHistoryEventHandler.java | 57 +++++++++++-
.../mapreduce/v2/app/job/impl/JobImpl.java | 2 +-
.../mapreduce/jobhistory/JobSubmittedEvent.java | 38 +++++++-
.../mapreduce/util/JobHistoryEventUtils.java | 3 +
.../mapred/TestMRTimelineEventHandling.java | 92 +++++++++++++++++---
.../org/apache/hadoop/mapred/UtilsForTests.java | 8 ++
6 files changed, 181 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a9099c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 528b450..887533d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -1074,7 +1074,16 @@ public class JobHistoryEventHandler extends AbstractService
entity.setId(jobId.toString());
return entity;
}
-
+
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+ createJobEntity(JobId jobId) {
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+ new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
+ entity.setId(jobId.toString());
+ entity.setType(MAPREDUCE_JOB_ENTITY_TYPE);
+ return entity;
+ }
+
// create ApplicationEntity with job finished Metrics from HistoryEvent
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createAppEntityWithJobMetrics(HistoryEvent event, JobId jobId) {
@@ -1133,6 +1142,46 @@ public class JobHistoryEventHandler extends AbstractService
return entity;
}
+ private void publishConfigsOnJobSubmittedEvent(JobSubmittedEvent event,
+ JobId jobId) {
+ if (event.getJobConf() == null) {
+ return;
+ }
+ // Publish job configurations both as job and app entity.
+ // Configs are split into multiple entities if they exceed 100kb in size.
+ org.apache.hadoop.yarn.api.records.timelineservice.
+ TimelineEntity jobEntityForConfigs = createJobEntity(jobId);
+ ApplicationEntity appEntityForConfigs = new ApplicationEntity();
+ String appId = jobId.getAppId().toString();
+ appEntityForConfigs.setId(appId);
+ try {
+ int configSize = 0;
+ for (Map.Entry<String, String> entry : event.getJobConf()) {
+ int size = entry.getKey().length() + entry.getValue().length();
+ configSize += size;
+ if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
+ if (jobEntityForConfigs.getConfigs().size() > 0) {
+ timelineClient.putEntities(jobEntityForConfigs);
+ timelineClient.putEntities(appEntityForConfigs);
+ jobEntityForConfigs = createJobEntity(jobId);
+ appEntityForConfigs = new ApplicationEntity();
+ appEntityForConfigs.setId(appId);
+ }
+ configSize = size;
+ }
+ jobEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
+ appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
+ }
+ if (configSize > 0) {
+ timelineClient.putEntities(jobEntityForConfigs);
+ timelineClient.putEntities(appEntityForConfigs);
+ }
+ } catch (IOException | YarnException e) {
+ LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
+ " for the job : " + jobId, e);
+ }
+ }
+
private void processEventForNewTimelineService(HistoryEvent event,
JobId jobId, long timestamp) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity =
@@ -1252,8 +1301,12 @@ public class JobHistoryEventHandler extends AbstractService
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
+ " for the job : " + jobId, e);
+ return;
+ }
+ if (event.getEventType() == EventType.JOB_SUBMITTED) {
+ // Publish configs after main job submitted event has been posted.
+ publishConfigsOnJobSubmittedEvent((JobSubmittedEvent)event, jobId);
}
-
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a9099c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index b7036a5..5127a43 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -1445,7 +1445,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
- job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
+ job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""), job.conf);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a9099c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
index 07edb58..7d05571 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@InterfaceStability.Unstable
public class JobSubmittedEvent implements HistoryEvent {
private JobSubmitted datum = new JobSubmitted();
+ private JobConf jobConf = null;
/**
* Create an event to record job submission
@@ -83,6 +85,31 @@ public class JobSubmittedEvent implements HistoryEvent {
workflowAdjacencies, "");
}
+ /**
+ * Create an event to record job submission.
+ * @param id The job Id of the job
+ * @param jobName Name of the job
+ * @param userName Name of the user who submitted the job
+ * @param submitTime Time of submission
+ * @param jobConfPath Path of the Job Configuration file
+ * @param jobACLs The configured acls for the job.
+ * @param jobQueueName The job-queue to which this job was submitted to
+ * @param workflowId The Id of the workflow
+ * @param workflowName The name of the workflow
+ * @param workflowNodeName The node name of the workflow
+ * @param workflowAdjacencies The adjacencies of the workflow
+ * @param workflowTags Comma-separated tags for the workflow
+ */
+ public JobSubmittedEvent(JobID id, String jobName, String userName,
+ long submitTime, String jobConfPath,
+ Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
+ String workflowId, String workflowName, String workflowNodeName,
+ String workflowAdjacencies, String workflowTags) {
+ this(id, jobName, userName, submitTime, jobConfPath, jobACLs,
+ jobQueueName, workflowId, workflowName, workflowNodeName,
+ workflowAdjacencies, workflowTags, null);
+ }
+
/**
* Create an event to record job submission
* @param id The job Id of the job
@@ -97,12 +124,13 @@ public class JobSubmittedEvent implements HistoryEvent {
* @param workflowNodeName The node name of the workflow
* @param workflowAdjacencies The adjacencies of the workflow
* @param workflowTags Comma-separated tags for the workflow
+ * @param conf Job configuration
*/
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath,
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
String workflowId, String workflowName, String workflowNodeName,
- String workflowAdjacencies, String workflowTags) {
+ String workflowAdjacencies, String workflowTags, JobConf conf) {
datum.setJobid(new Utf8(id.toString()));
datum.setJobName(new Utf8(jobName));
datum.setUserName(new Utf8(userName));
@@ -132,6 +160,7 @@ public class JobSubmittedEvent implements HistoryEvent {
if (workflowTags != null) {
datum.setWorkflowTags(new Utf8(workflowTags));
}
+ jobConf = conf;
}
JobSubmittedEvent() {}
@@ -208,7 +237,11 @@ public class JobSubmittedEvent implements HistoryEvent {
}
/** Get the event type */
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
-
+
+ public JobConf getJobConf() {
+ return jobConf;
+ }
+
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
@@ -234,5 +267,4 @@ public class JobSubmittedEvent implements HistoryEvent {
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a9099c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
index 225d517..35d066c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
@@ -36,6 +36,9 @@ public final class JobHistoryEventUtils {
private JobHistoryEventUtils() {
}
+ // Number of bytes of config which can be published in one shot to ATSv2.
+ public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024;
+
public static JsonNode countersToJSON(Counters counters) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode nodes = mapper.createArrayNode();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a9099c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index 5915a43..fde9e64 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -26,7 +26,9 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.EnumSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@@ -55,6 +57,8 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Sets;
+
public class TestMRTimelineEventHandling {
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
@@ -191,8 +195,17 @@ public class TestMRTimelineEventHandling {
Path inDir = new Path("input");
Path outDir = new Path("output");
LOG.info("Run 1st job which should be successful.");
+ JobConf successConf = new JobConf(conf);
+ successConf.set("dummy_conf1",
+ UtilsForTests.createConfigValue(51 * 1024));
+ successConf.set("dummy_conf2",
+ UtilsForTests.createConfigValue(51 * 1024));
+ successConf.set("huge_dummy_conf1",
+ UtilsForTests.createConfigValue(101 * 1024));
+ successConf.set("huge_dummy_conf2",
+ UtilsForTests.createConfigValue(101 * 1024));
RunningJob job =
- UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+ UtilsForTests.runJobSucceed(successConf, inDir, outDir);
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
@@ -270,7 +283,11 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("jobEventFilePath: " + jobEventFilePath +
" does not exist.",
jobEventFile.exists());
- verifyMetricsWhenEvent(jobEventFile, EventType.JOB_FINISHED.name());
+ verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(),
+ true, false, null);
+ Set<String> cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2",
+ "huge_dummy_conf1", "huge_dummy_conf2");
+ verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
// for this test, we expect MR job metrics are published in YARN_APPLICATION
String outputAppDir = basePath + "/YARN_APPLICATION/";
@@ -290,7 +307,8 @@ public class TestMRTimelineEventHandling {
"appEventFilePath: " + appEventFilePath +
" does not exist.",
appEventFile.exists());
- verifyMetricsWhenEvent(appEventFile, null);
+ verifyEntity(appEventFile, null, true, false, null);
+ verifyEntity(appEventFile, null, false, true, cfgsToCheck);
// check for task event file
String outputDirTask = basePath + "/MAPREDUCE_TASK/";
@@ -307,7 +325,8 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("taskEventFileName: " + taskEventFilePath +
" does not exist.",
taskEventFile.exists());
- verifyMetricsWhenEvent(taskEventFile, EventType.TASK_FINISHED.name());
+ verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
+ true, false, null);
// check for task attempt event file
String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
@@ -324,17 +343,30 @@ public class TestMRTimelineEventHandling {
File taskAttemptEventFile = new File(taskAttemptEventFilePath);
Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
" does not exist.", taskAttemptEventFile.exists());
- verifyMetricsWhenEvent(taskAttemptEventFile,
- EventType.MAP_ATTEMPT_FINISHED.name());
+ verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(),
+ true, false, null);
}
- private void verifyMetricsWhenEvent(File entityFile, String eventId)
+ /**
+ * Verifies entity by reading the entity file written via FS impl.
+ * @param entityFile File to be read.
+ * @param eventId Event to be checked.
+ * @param chkMetrics If event is not null, this flag determines if metrics
+ * exist when the event is encountered. If event is null, we merely check
+ * if metrics exist in the entity file.
+ * @param chkCfg If event is not null, this flag determines if configs
+ * exist when the event is encountered. If event is null, we merely check
+ * if configs exist in the entity file.
+ * @param cfgsToVerify a set of configs which should exist in the entity file.
+ * @throws IOException
+ */
+ private void verifyEntity(File entityFile, String eventId,
+ boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify)
throws IOException {
BufferedReader reader = null;
String strLine;
try {
reader = new BufferedReader(new FileReader(entityFile));
- boolean jobMetricsFoundForAppEntity = false;
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
@@ -344,23 +376,57 @@ public class TestMRTimelineEventHandling {
if (eventId == null) {
// Job metrics are published without any events for
// ApplicationEntity. There is also possibility that some other
- // ApplicationEntity is published without events, hence loop all
- if (entity.getEvents().size() == 0) {
- jobMetricsFoundForAppEntity = entity.getMetrics().size() > 0;
- if (jobMetricsFoundForAppEntity) {
+ // ApplicationEntity is published without events, hence loop till
+ // its found. Same applies to configs.
+ if (chkMetrics && entity.getMetrics().size() > 0) {
+ return;
+ }
+ if (chkCfg && entity.getConfigs().size() > 0) {
+ if (cfgsToVerify == null) {
return;
+ } else {
+ // Have configs to verify. Keep on removing configs from the set
+ // of configs to verify as they are found. When the all the
+ // entities have been looped through, we will check if the set
+ // is empty or not(indicating if all configs have been found or
+ // not).
+ for (Iterator<String> itr =
+ cfgsToVerify.iterator(); itr.hasNext();) {
+ String config = itr.next();
+ if (entity.getConfigs().containsKey(config)) {
+ itr.remove();
+ }
+ }
+ // All the required configs have been verified, so return.
+ if (cfgsToVerify.isEmpty()) {
+ return;
+ }
}
}
} else {
for (TimelineEvent event : entity.getEvents()) {
if (event.getId().equals(eventId)) {
- assertTrue(entity.getMetrics().size() > 0);
+ if (chkMetrics) {
+ assertTrue(entity.getMetrics().size() > 0);
+ }
+ if (chkCfg) {
+ assertTrue(entity.getConfigs().size() > 0);
+ if (cfgsToVerify != null) {
+ for (String cfg : cfgsToVerify) {
+ assertTrue(entity.getConfigs().containsKey(cfg));
+ }
+ }
+ }
return;
}
}
}
}
}
+ if (cfgsToVerify != null) {
+ assertTrue(cfgsToVerify.isEmpty());
+ return;
+ }
fail("Expected event : " + eventId + " not found in the file "
+ entityFile);
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a9099c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
index cb494db..2fb6828 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
@@ -156,6 +156,14 @@ public class UtilsForTests {
return buf.toString();
}
+ public static String createConfigValue(int msgSize) {
+ StringBuilder sb = new StringBuilder(msgSize);
+ for (int i=0; i<msgSize; i++) {
+ sb.append('a');
+ }
+ return sb.toString();
+ }
+
public static String safeGetCanonicalPath(File f) {
try {
String s = f.getCanonicalPath();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org