You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/15 00:22:37 UTC
hadoop git commit: MAPREDUCE-6337. Added a mode to replay MR job
history files and put them into the timeline service v2. Contributed by
Sangjin Lee.
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 b059dd488 -> 463e070a8
MAPREDUCE-6337. Added a mode to replay MR job history files and put them into the timeline service v2. Contributed by Sangjin Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/463e070a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/463e070a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/463e070a
Branch: refs/heads/YARN-2928
Commit: 463e070a8e7c882706a96eaa20ea49bfe9982875
Parents: b059dd4
Author: Zhijie Shen <zj...@apache.org>
Authored: Thu May 14 15:16:33 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Thu May 14 15:22:16 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../hadoop/mapred/JobHistoryFileParser.java | 53 ++++
.../mapred/JobHistoryFileReplayMapper.java | 301 +++++++++++++++++++
.../hadoop/mapred/SimpleEntityWriter.java | 139 +++++++++
.../hadoop/mapred/TimelineEntityConverter.java | 207 +++++++++++++
.../mapred/TimelineServicePerformanceV2.java | 191 ++++--------
.../collector/TimelineCollectorManager.java | 10 +-
.../storage/FileSystemTimelineWriterImpl.java | 23 +-
.../timelineservice/storage/package-info.java | 24 ++
9 files changed, 810 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 9912b6d..6ab48e7 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -12,6 +12,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
MAPREDUCE-6335. Created MR job based performance test driver for the
timeline service v2. (Sangjin Lee via zjshen)
+ MAPREDUCE-6337. Added a mode to replay MR job history files and put them
+ into the timeline service v2. (Sangjin Lee via zjshen)
+
IMPROVEMENTS
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
new file mode 100644
index 0000000..9d051df
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+
+class JobHistoryFileParser {
+ private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);
+
+ private final FileSystem fs;
+
+ public JobHistoryFileParser(FileSystem fs) {
+ LOG.info("JobHistoryFileParser created with " + fs);
+ this.fs = fs;
+ }
+
+ public JobInfo parseHistoryFile(Path path) throws IOException {
+ LOG.info("parsing job history file " + path);
+ JobHistoryParser parser = new JobHistoryParser(fs, path);
+ return parser.parse();
+ }
+
+ public Configuration parseConfiguration(Path path) throws IOException {
+ LOG.info("parsing job configuration file " + path);
+ Configuration conf = new Configuration(false);
+ conf.addResource(fs.open(path));
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
new file mode 100644
index 0000000..802b78f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
+import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Mapper for TimelineServicePerformanceV2 that replays job history files to the
+ * timeline service.
+ *
+ */
+class JobHistoryFileReplayMapper extends EntityWriter {
+ private static final Log LOG =
+ LogFactory.getLog(JobHistoryFileReplayMapper.class);
+
+ static final String PROCESSING_PATH = "processing path";
+ static final String REPLAY_MODE = "replay mode";
+ static final int WRITE_ALL_AT_ONCE = 1;
+ static final int WRITE_PER_ENTITY = 2;
+ static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE;
+
+ private static final Pattern JOB_ID_PARSER =
+ Pattern.compile("^(job_[0-9]+_([0-9]+)).*");
+
+ public static class JobFiles {
+ private final String jobId;
+ private Path jobHistoryFilePath;
+ private Path jobConfFilePath;
+
+ public JobFiles(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public Path getJobHistoryFilePath() {
+ return jobHistoryFilePath;
+ }
+
+ public void setJobHistoryFilePath(Path jobHistoryFilePath) {
+ this.jobHistoryFilePath = jobHistoryFilePath;
+ }
+
+ public Path getJobConfFilePath() {
+ return jobConfFilePath;
+ }
+
+ public void setJobConfFilePath(Path jobConfFilePath) {
+ this.jobConfFilePath = jobConfFilePath;
+ }
+
+ @Override
+ public int hashCode() {
+ return jobId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ JobFiles other = (JobFiles) obj;
+ return jobId.equals(other.jobId);
+ }
+ }
+
+ private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN }
+
+
+ @Override
+ protected void writeEntities(Configuration tlConf,
+ TimelineCollectorManager manager, Context context) throws IOException {
+ // collect the apps it needs to process
+ Configuration conf = context.getConfiguration();
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ int size = conf.getInt(MRJobConfig.NUM_MAPS,
+ TimelineServicePerformanceV2.NUM_MAPS_DEFAULT);
+ String processingDir =
+ conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
+ int replayMode =
+ conf.getInt(JobHistoryFileReplayMapper.REPLAY_MODE,
+ JobHistoryFileReplayMapper.REPLAY_MODE_DEFAULT);
+ Path processingPath = new Path(processingDir);
+ FileSystem processingFs = processingPath.getFileSystem(conf);
+ JobHistoryFileParser parser = new JobHistoryFileParser(processingFs);
+ TimelineEntityConverter converter = new TimelineEntityConverter();
+
+ Collection<JobFiles> jobs =
+ selectJobFiles(processingFs, processingPath, taskId, size);
+ if (jobs.isEmpty()) {
+ LOG.info(context.getTaskAttemptID().getTaskID() +
+ " will process no jobs");
+ } else {
+ LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
+ jobs.size() + " jobs");
+ }
+ for (JobFiles job: jobs) {
+ // process each job
+ String jobIdStr = job.getJobId();
+ LOG.info("processing " + jobIdStr + "...");
+ JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
+ ApplicationId appId = jobId.getAppId();
+
+ // create the app level timeline collector and start it
+ AppLevelTimelineCollector collector =
+ new AppLevelTimelineCollector(appId);
+ manager.putIfAbsent(appId, collector);
+ try {
+ // parse the job info and configuration
+ JobInfo jobInfo =
+ parser.parseHistoryFile(job.getJobHistoryFilePath());
+ Configuration jobConf =
+ parser.parseConfiguration(job.getJobConfFilePath());
+ LOG.info("parsed the job history file and the configuration file for job"
+ + jobIdStr);
+
+ // set the context
+ // flow id: job name, flow run id: timestamp, user id
+ TimelineCollectorContext tlContext =
+ collector.getTimelineEntityContext();
+ tlContext.setFlowName(jobInfo.getJobname());
+ tlContext.setFlowRunId(jobInfo.getSubmitTime());
+ tlContext.setUserId(jobInfo.getUsername());
+
+ // create entities from job history and write them
+ long totalTime = 0;
+ Set<TimelineEntity> entitySet =
+ converter.createTimelineEntities(jobInfo, jobConf);
+ LOG.info("converted them into timeline entities for job " + jobIdStr);
+ // use the current user for this purpose
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ long startWrite = System.nanoTime();
+ try {
+ switch (replayMode) {
+ case JobHistoryFileReplayMapper.WRITE_ALL_AT_ONCE:
+ writeAllEntities(collector, entitySet, ugi);
+ break;
+ case JobHistoryFileReplayMapper.WRITE_PER_ENTITY:
+ writePerEntity(collector, entitySet, ugi);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+ increment(1);
+ LOG.error("writing to the timeline service failed", e);
+ }
+ long endWrite = System.nanoTime();
+ totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+ int numEntities = entitySet.size();
+ LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
+
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+ increment(totalTime);
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+ increment(numEntities);
+ } finally {
+ manager.remove(appId);
+ context.progress(); // move it along
+ }
+ }
+ }
+
+ private void writeAllEntities(AppLevelTimelineCollector collector,
+ Set<TimelineEntity> entitySet, UserGroupInformation ugi)
+ throws IOException {
+ TimelineEntities entities = new TimelineEntities();
+ entities.setEntities(entitySet);
+ collector.putEntities(entities, ugi);
+ }
+
+ private void writePerEntity(AppLevelTimelineCollector collector,
+ Set<TimelineEntity> entitySet, UserGroupInformation ugi)
+ throws IOException {
+ for (TimelineEntity entity : entitySet) {
+ TimelineEntities entities = new TimelineEntities();
+ entities.addEntity(entity);
+ collector.putEntities(entities, ugi);
+ LOG.info("wrote entity " + entity.getId());
+ }
+ }
+
+ private Collection<JobFiles> selectJobFiles(FileSystem fs,
+ Path processingRoot, int i, int size) throws IOException {
+ Map<String,JobFiles> jobs = new HashMap<>();
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(processingRoot, true);
+ while (it.hasNext()) {
+ LocatedFileStatus status = it.next();
+ Path path = status.getPath();
+ String fileName = path.getName();
+ Matcher m = JOB_ID_PARSER.matcher(fileName);
+ if (!m.matches()) {
+ continue;
+ }
+ String jobId = m.group(1);
+ int lastId = Integer.parseInt(m.group(2));
+ int mod = lastId % size;
+ if (mod != i) {
+ continue;
+ }
+ LOG.info("this mapper will process file " + fileName);
+ // it's mine
+ JobFiles jobFiles = jobs.get(jobId);
+ if (jobFiles == null) {
+ jobFiles = new JobFiles(jobId);
+ jobs.put(jobId, jobFiles);
+ }
+ setFilePath(fileName, path, jobFiles);
+ }
+ return jobs.values();
+ }
+
+ private void setFilePath(String fileName, Path path,
+ JobFiles jobFiles) {
+ // determine if we're dealing with a job history file or a job conf file
+ FileType type = getFileType(fileName);
+ switch (type) {
+ case JOB_HISTORY_FILE:
+ if (jobFiles.getJobHistoryFilePath() == null) {
+ jobFiles.setJobHistoryFilePath(path);
+ } else {
+ LOG.warn("we already have the job history file " +
+ jobFiles.getJobHistoryFilePath() + ": skipping " + path);
+ }
+ break;
+ case JOB_CONF_FILE:
+ if (jobFiles.getJobConfFilePath() == null) {
+ jobFiles.setJobConfFilePath(path);
+ } else {
+ LOG.warn("we already have the job conf file " +
+ jobFiles.getJobConfFilePath() + ": skipping " + path);
+ }
+ break;
+ case UNKNOWN:
+ LOG.warn("unknown type: " + path);
+ }
+ }
+
+ private FileType getFileType(String fileName) {
+ if (fileName.endsWith(".jhist")) {
+ return FileType.JOB_HISTORY_FILE;
+ }
+ if (fileName.endsWith("_conf.xml")) {
+ return FileType.JOB_CONF_FILE;
+ }
+ return FileType.UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
new file mode 100644
index 0000000..4ef0a14
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
+import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Adds simple entities with random string payload, events, metrics, and
+ * configuration.
+ */
+class SimpleEntityWriter extends EntityWriter {
+ private static final Log LOG = LogFactory.getLog(SimpleEntityWriter.class);
+
+ // constants for mtype = 1
+ static final String KBS_SENT = "kbs sent";
+ static final int KBS_SENT_DEFAULT = 1;
+ static final String TEST_TIMES = "testtimes";
+ static final int TEST_TIMES_DEFAULT = 100;
+ static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
+ "timeline.server.performance.run.id";
+
+ protected void writeEntities(Configuration tlConf,
+ TimelineCollectorManager manager, Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ // simulate the app id with the task id
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
+ ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
+
+ // create the app level timeline collector
+ AppLevelTimelineCollector collector =
+ new AppLevelTimelineCollector(appId);
+ manager.putIfAbsent(appId, collector);
+
+ try {
+ // set the context
+ // flow id: job name, flow run id: timestamp, user id
+ TimelineCollectorContext tlContext =
+ collector.getTimelineEntityContext();
+ tlContext.setFlowName(context.getJobName());
+ tlContext.setFlowRunId(timestamp);
+ tlContext.setUserId(context.getUser());
+
+ final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
+
+ long totalTime = 0;
+ final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
+ final Random rand = new Random();
+ final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
+ final char[] payLoad = new char[kbs * 1024];
+
+ for (int i = 0; i < testtimes; i++) {
+ // Generate a fixed length random payload
+ for (int xx = 0; xx < kbs * 1024; xx++) {
+ int alphaNumIdx =
+ rand.nextInt(TimelineServicePerformanceV2.alphaNums.length);
+ payLoad[xx] = TimelineServicePerformanceV2.alphaNums[alphaNumIdx];
+ }
+ String entId = taskAttemptId + "_" + Integer.toString(i);
+ final TimelineEntity entity = new TimelineEntity();
+ entity.setId(entId);
+ entity.setType("FOO_ATTEMPT");
+ entity.addInfo("PERF_TEST", payLoad);
+ // add an event
+ TimelineEvent event = new TimelineEvent();
+ event.setTimestamp(System.currentTimeMillis());
+ event.addInfo("foo_event", "test");
+ entity.addEvent(event);
+ // add a metric
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId("foo_metric");
+ metric.addValue(System.currentTimeMillis(), 123456789L);
+ entity.addMetric(metric);
+ // add a config
+ entity.addConfig("foo", "bar");
+
+ TimelineEntities entities = new TimelineEntities();
+ entities.addEntity(entity);
+ // use the current user for this purpose
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ long startWrite = System.nanoTime();
+ try {
+ collector.putEntities(entities, ugi);
+ } catch (Exception e) {
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+ increment(1);
+ LOG.error("writing to the timeline service failed", e);
+ }
+ long endWrite = System.nanoTime();
+ totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+ }
+ LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
+ " kB) in " + totalTime + " ms");
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+ increment(totalTime);
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+ increment(testtimes);
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
+ increment(kbs*testtimes);
+ } finally {
+ // clean up
+ manager.remove(appId);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
new file mode 100644
index 0000000..80928dc
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+class TimelineEntityConverter {
+ private static final Log LOG =
+ LogFactory.getLog(TimelineEntityConverter.class);
+
+ static final String JOB = "MAPREDUCE_JOB";
+ static final String TASK = "MAPREDUCE_TASK";
+ static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT";
+
+ /**
+ * Creates job, task, and task attempt entities based on the job history info
+ * and configuration.
+ *
+ * Note: currently these are plan timeline entities created for mapreduce
+ * types. These are not meant to be the complete and accurate entity set-up
+ * for mapreduce jobs. We do not leverage hierarchical timeline entities. If
+ * we create canonical mapreduce hierarchical timeline entities with proper
+ * parent-child relationship, we could modify this to use that instead.
+ *
+ * Note that we also do not add info to the YARN application entity, which
+ * would be needed for aggregation.
+ */
+ public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
+ Configuration conf) {
+ Set<TimelineEntity> entities = new HashSet<>();
+
+ // create the job entity
+ TimelineEntity job = createJobEntity(jobInfo, conf);
+ entities.add(job);
+
+ // create the task and task attempt entities
+ Set<TimelineEntity> tasksAndAttempts =
+ createTaskAndTaskAttemptEntities(jobInfo);
+ entities.addAll(tasksAndAttempts);
+
+ return entities;
+ }
+
+ private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
+ TimelineEntity job = new TimelineEntity();
+ job.setType(JOB);
+ job.setId(jobInfo.getJobId().toString());
+ job.setCreatedTime(jobInfo.getSubmitTime());
+
+ job.addInfo("JOBNAME", jobInfo.getJobname());
+ job.addInfo("USERNAME", jobInfo.getUsername());
+ job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName());
+ job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime());
+ job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime());
+ job.addInfo("FINISH_TIME", jobInfo.getFinishTime());
+ job.addInfo("JOB_STATUS", jobInfo.getJobStatus());
+ job.addInfo("PRIORITY", jobInfo.getPriority());
+ job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps());
+ job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces());
+ job.addInfo("UBERIZED", jobInfo.getUberized());
+ job.addInfo("ERROR_INFO", jobInfo.getErrorInfo());
+
+ // add metrics from total counters
+ // we omit the map counters and reduce counters for now as it's kind of
+ // awkward to put them (map/reduce/total counters are really a group of
+ // related counters)
+ Counters totalCounters = jobInfo.getTotalCounters();
+ if (totalCounters != null) {
+ addMetrics(job, totalCounters);
+ }
+ // finally add configuration to the job
+ addConfiguration(job, conf);
+ LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity");
+ return job;
+ }
+
+ private void addConfiguration(TimelineEntity job, Configuration conf) {
+ for (Map.Entry<String,String> e: conf) {
+ job.addConfig(e.getKey(), e.getValue());
+ }
+ }
+
+ private void addMetrics(TimelineEntity entity, Counters counters) {
+ for (CounterGroup g: counters) {
+ String groupName = g.getName();
+ for (Counter c: g) {
+ String name = groupName + ":" + c.getName();
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId(name);
+ metric.addValue(System.currentTimeMillis(), c.getValue());
+ entity.addMetric(metric);
+ }
+ }
+ }
+
+ private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo jobInfo) {
+ Set<TimelineEntity> entities = new HashSet<>();
+ Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
+ LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
+ " tasks");
+ for (TaskInfo taskInfo: taskInfoMap.values()) {
+ TimelineEntity task = createTaskEntity(taskInfo);
+ entities.add(task);
+ // add the task attempts from this task
+ Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo);
+ entities.addAll(taskAttempts);
+ }
+ return entities;
+ }
+
+ private TimelineEntity createTaskEntity(TaskInfo taskInfo) {
+ TimelineEntity task = new TimelineEntity();
+ task.setType(TASK);
+ task.setId(taskInfo.getTaskId().toString());
+ task.setCreatedTime(taskInfo.getStartTime());
+
+ task.addInfo("START_TIME", taskInfo.getStartTime());
+ task.addInfo("FINISH_TIME", taskInfo.getFinishTime());
+ task.addInfo("TASK_TYPE", taskInfo.getTaskType());
+ task.addInfo("TASK_STATUS", taskInfo.getTaskStatus());
+ task.addInfo("ERROR_INFO", taskInfo.getError());
+
+ // add metrics from counters
+ Counters counters = taskInfo.getCounters();
+ if (counters != null) {
+ addMetrics(task, counters);
+ }
+ LOG.info("converted task " + taskInfo.getTaskId() +
+ " to a timeline entity");
+ return task;
+ }
+
+ private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
+ Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
+ Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
+ taskInfo.getAllTaskAttempts();
+ LOG.info("task " + taskInfo.getTaskId() + " has " +
+ taskAttemptInfoMap.size() + " task attempts");
+ for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) {
+ TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo);
+ taskAttempts.add(taskAttempt);
+ }
+ return taskAttempts;
+ }
+
+ private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) {
+ TimelineEntity taskAttempt = new TimelineEntity();
+ taskAttempt.setType(TASK_ATTEMPT);
+ taskAttempt.setId(taskAttemptInfo.getAttemptId().toString());
+ taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime());
+
+ taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime());
+ taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime());
+ taskAttempt.addInfo("MAP_FINISH_TIME",
+ taskAttemptInfo.getMapFinishTime());
+ taskAttempt.addInfo("SHUFFLE_FINISH_TIME",
+ taskAttemptInfo.getShuffleFinishTime());
+ taskAttempt.addInfo("SORT_FINISH_TIME",
+ taskAttemptInfo.getSortFinishTime());
+ taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus());
+ taskAttempt.addInfo("STATE", taskAttemptInfo.getState());
+ taskAttempt.addInfo("ERROR", taskAttemptInfo.getError());
+ taskAttempt.addInfo("CONTAINER_ID",
+ taskAttemptInfo.getContainerId().toString());
+
+ // add metrics from counters
+ Counters counters = taskAttemptInfo.getCounters();
+ if (counters != null) {
+ addMetrics(taskAttempt, counters);
+ }
+ LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() +
+ " to a timeline entity");
+ return taskAttempt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
index 1c2e28d..f674ae1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
@@ -20,10 +20,7 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.Date;
-import java.util.Random;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
@@ -31,49 +28,35 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
public class TimelineServicePerformanceV2 extends Configured implements Tool {
- private static final Log LOG =
- LogFactory.getLog(TimelineServicePerformanceV2.class);
-
static final int NUM_MAPS_DEFAULT = 1;
static final int SIMPLE_ENTITY_WRITER = 1;
- // constants for mtype = 1
- static final String KBS_SENT = "kbs sent";
- static final int KBS_SENT_DEFAULT = 1;
- static final String TEST_TIMES = "testtimes";
- static final int TEST_TIMES_DEFAULT = 100;
- static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
- "timeline.server.performance.run.id";
-
+ static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2;
static int mapperType = SIMPLE_ENTITY_WRITER;
protected static int printUsage() {
- // TODO is there a way to handle mapper-specific options more gracefully?
System.err.println(
"Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
")\n" +
- " [-mtype <mapper type in integer>] \n" +
+ " [-mtype <mapper type in integer>]\n" +
" 1. simple entity write mapper\n" +
- " [-s <(KBs)test>] number of KB per put (default: " +
- KBS_SENT_DEFAULT + " KB)\n" +
- " [-t] package sending iterations per mapper (default: " +
- TEST_TIMES_DEFAULT + ")\n");
+ " 2. job history file replay mapper\n" +
+ " [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
+ SimpleEntityWriter.KBS_SENT_DEFAULT + " KB)\n" +
+ " [-t] package sending iterations per mapper (mtype=1, default: " +
+ SimpleEntityWriter.TEST_TIMES_DEFAULT + ")\n" +
+ " [-d <path>] root path of job history files (mtype=2)\n" +
+ " [-r <replay mode>] (mtype=2)\n" +
+ " 1. write all entities for a job in one put (default)\n" +
+ " 2. write one entity at a time\n");
GenericOptionsParser.printGenericCommandUsage(System.err);
return -1;
}
@@ -82,11 +65,9 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
* Configure a job given argv.
*/
public static boolean parseArgs(String[] args, Job job) throws IOException {
- // set the defaults
+ // set the common defaults
Configuration conf = job.getConfiguration();
conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
- conf.setInt(KBS_SENT, KBS_SENT_DEFAULT);
- conf.setInt(TEST_TIMES, TEST_TIMES_DEFAULT);
for (int i = 0; i < args.length; i++) {
if (args.length == i + 1) {
@@ -97,25 +78,24 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
if ("-m".equals(args[i])) {
if (Integer.parseInt(args[++i]) > 0) {
job.getConfiguration()
- .setInt(MRJobConfig.NUM_MAPS, (Integer.parseInt(args[i])));
+ .setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
}
} else if ("-mtype".equals(args[i])) {
mapperType = Integer.parseInt(args[++i]);
- switch (mapperType) {
- case SIMPLE_ENTITY_WRITER:
- job.setMapperClass(SimpleEntityWriter.class);
- break;
- default:
- job.setMapperClass(SimpleEntityWriter.class);
- }
} else if ("-s".equals(args[i])) {
if (Integer.parseInt(args[++i]) > 0) {
- conf.setInt(KBS_SENT, (Integer.parseInt(args[i])));
+ conf.setInt(SimpleEntityWriter.KBS_SENT, Integer.parseInt(args[i]));
}
} else if ("-t".equals(args[i])) {
if (Integer.parseInt(args[++i]) > 0) {
- conf.setInt(TEST_TIMES, (Integer.parseInt(args[i])));
+ conf.setInt(SimpleEntityWriter.TEST_TIMES,
+ Integer.parseInt(args[i]));
}
+ } else if ("-d".equals(args[i])) {
+ conf.set(JobHistoryFileReplayMapper.PROCESSING_PATH, args[++i]);
+ } else if ("-r".equals(args[i])) {
+ conf.setInt(JobHistoryFileReplayMapper.REPLAY_MODE,
+ Integer.parseInt(args[++i]));
} else {
System.out.println("Unexpected argument: " + args[i]);
return printUsage() == 0;
@@ -128,6 +108,27 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
}
}
+ // handle mapper-specific settings
+ switch (mapperType) {
+ case JOB_HISTORY_FILE_REPLAY_MAPPER:
+ job.setMapperClass(JobHistoryFileReplayMapper.class);
+ String processingPath =
+ conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
+ if (processingPath == null || processingPath.isEmpty()) {
+ System.out.println("processing path is missing while mtype = 2");
+ return printUsage() == 0;
+ }
+ break;
+ case SIMPLE_ENTITY_WRITER:
+ default:
+ job.setMapperClass(SimpleEntityWriter.class);
+ // use the current timestamp as the "run id" of the test: this will
+ // be used as simulating the cluster timestamp for apps
+ conf.setLong(SimpleEntityWriter.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
+ System.currentTimeMillis());
+ break;
+ }
+
return true;
}
@@ -153,13 +154,6 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
return -1;
}
- // for mtype = 1
- // use the current timestamp as the "run id" of the test: this will be used
- // as simulating the cluster timestamp for apps
- Configuration conf = job.getConfiguration();
- conf.setLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
- System.currentTimeMillis());
-
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
@@ -172,7 +166,8 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
double transacrate = writecounts * 1000 / (double)writetime;
double iorate = writesize * 1000 / (double)writetime;
- int numMaps = Integer.parseInt(conf.get(MRJobConfig.NUM_MAPS));
+ int numMaps =
+ Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS));
System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
" ops/s");
@@ -204,95 +199,31 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
'3', '4', '5', '6', '7', '8', '9', '0', ' ' };
/**
- * Adds simple entities with random string payload, events, metrics, and
- * configuration.
+ * Base mapper for writing entities to the timeline service. Subclasses
+ * override {@link #writeEntities(Configuration, TimelineCollectorManager,
+ * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
+ * to the timeline service.
*/
- public static class SimpleEntityWriter
+ public static abstract class EntityWriter
extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
+ @Override
public void map(IntWritable key, IntWritable val, Context context)
throws IOException {
- Configuration conf = context.getConfiguration();
- // simulate the app id with the task id
- int taskId = context.getTaskAttemptID().getTaskID().getId();
- long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
- ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
-
- // create the app level timeline collector
+ // create the timeline collector manager wired with the writer
Configuration tlConf = new YarnConfiguration();
- AppLevelTimelineCollector collector =
- new AppLevelTimelineCollector(appId);
- collector.init(tlConf);
- collector.start();
-
+ TimelineCollectorManager manager = new TimelineCollectorManager("test");
+ manager.init(tlConf);
+ manager.start();
try {
- // set the context
- // flow id: job name, flow run id: timestamp, user id
- TimelineCollectorContext tlContext =
- collector.getTimelineEntityContext();
- tlContext.setFlowName(context.getJobName());
- tlContext.setFlowRunId(timestamp);
- tlContext.setUserId(context.getUser());
-
- final int kbs = Integer.parseInt(conf.get(KBS_SENT));
-
- long totalTime = 0;
- final int testtimes = Integer.parseInt(conf.get(TEST_TIMES));
- final Random rand = new Random();
- final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
- final char[] payLoad = new char[kbs * 1024];
-
- for (int i = 0; i < testtimes; i++) {
- // Generate a fixed length random payload
- for (int xx = 0; xx < kbs * 1024; xx++) {
- int alphaNumIdx = rand.nextInt(alphaNums.length);
- payLoad[xx] = alphaNums[alphaNumIdx];
- }
- String entId = taskAttemptId + "_" + Integer.toString(i);
- final TimelineEntity entity = new TimelineEntity();
- entity.setId(entId);
- entity.setType("FOO_ATTEMPT");
- entity.addInfo("PERF_TEST", payLoad);
- // add an event
- TimelineEvent event = new TimelineEvent();
- event.setTimestamp(System.currentTimeMillis());
- event.addInfo("foo_event", "test");
- entity.addEvent(event);
- // add a metric
- TimelineMetric metric = new TimelineMetric();
- metric.setId("foo_metric");
- metric.addValue(System.currentTimeMillis(), 123456789L);
- entity.addMetric(metric);
- // add a config
- entity.addConfig("foo", "bar");
-
- TimelineEntities entities = new TimelineEntities();
- entities.addEntity(entity);
- // use the current user for this purpose
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- long startWrite = System.nanoTime();
- try {
- collector.putEntities(entities, ugi);
- } catch (Exception e) {
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
- increment(1);
- e.printStackTrace();
- }
- long endWrite = System.nanoTime();
- totalTime += (endWrite-startWrite)/1000000L;
- }
- LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
- " kB) in " + totalTime + " ms");
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
- increment(totalTime);
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
- increment(testtimes);
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
- increment(kbs*testtimes);
+ // invoke the method to have the subclass write entities
+ writeEntities(tlConf, manager, context);
} finally {
- // clean up
- collector.close();
+ manager.close();
}
}
+
+ protected abstract void writeEntities(Configuration tlConf,
+ TimelineCollectorManager manager, Context context) throws IOException;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 953d9b7..61fa1d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -43,11 +43,11 @@ import java.util.Map;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public abstract class TimelineCollectorManager extends AbstractService {
+public class TimelineCollectorManager extends AbstractService {
private static final Log LOG =
LogFactory.getLog(TimelineCollectorManager.class);
- protected TimelineWriter writer;
+ private TimelineWriter writer;
@Override
public void serviceInit(Configuration conf) throws Exception {
@@ -65,10 +65,14 @@ public abstract class TimelineCollectorManager extends AbstractService {
Collections.synchronizedMap(
new HashMap<ApplicationId, TimelineCollector>());
- protected TimelineCollectorManager(String name) {
+ public TimelineCollectorManager(String name) {
super(name);
}
+ protected TimelineWriter getWriter() {
+ return writer;
+ }
+
/**
* Put the collector into the collection if an collector mapped by id does
* not exist.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index ee1515d..1fc7651 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -47,17 +47,17 @@ public class FileSystemTimelineWriterImpl extends AbstractService
private String outputRoot;
- /** Config param for timeline service storage tmp root for FILE YARN-3264 */
+ /** Config param for timeline service storage tmp root for FILE YARN-3264. */
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
- = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+ = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
- /** default value for storage location on local disk */
+ /** default value for storage location on local disk. */
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
- = "/tmp/timeline_service_data";
+ = "/tmp/timeline_service_data";
private static final String ENTITIES_DIR = "entities";
- /** Default extension for output files */
+ /** Default extension for output files. */
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
FileSystemTimelineWriterImpl() {
@@ -81,9 +81,11 @@ public class FileSystemTimelineWriterImpl extends AbstractService
TimelineWriteResponse response) throws IOException {
PrintWriter out = null;
try {
- String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowName,
- flowVersion, String.valueOf(flowRun), appId, entity.getType());
- String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION;
+ String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,
+ escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId,
+ entity.getType());
+ String fileName = dir + entity.getId() +
+ TIMELINE_SERVICE_STORAGE_EXTENSION;
out =
new PrintWriter(new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(fileName, true), "UTF-8")));
@@ -140,4 +142,9 @@ public class FileSystemTimelineWriterImpl extends AbstractService
}
return path.toString();
}
+
+ // specifically escape the separator character
+ private static String escape(String str) {
+ return str.replace(File.separatorChar, '_');
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/463e070a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
new file mode 100644
index 0000000..f652ffd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;