You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2016/03/09 06:51:32 UTC
[2/2] hadoop git commit: MAPREDUCE-6546. reconcile the two versions
of the timeline service performance tests. (Sangjin Lee via Naganarasimha G
R)
MAPREDUCE-6546. reconcile the two versions of the timeline service performance tests. (Sangjin Lee via Naganarasimha G R)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c6f4c513
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c6f4c513
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c6f4c513
Branch: refs/heads/YARN-2928
Commit: c6f4c51360d93f02714fa05980b8c4dd9274ff1d
Parents: 85513ea
Author: naganarasimha <na...@apache.com>
Authored: Wed Mar 9 11:20:32 2016 +0530
Committer: naganarasimha <na...@apache.com>
Committed: Wed Mar 9 11:20:32 2016 +0530
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../hadoop/mapred/JobHistoryFileParser.java | 53 ----
.../mapred/JobHistoryFileReplayMapper.java | 301 -------------------
.../hadoop/mapred/SimpleEntityWriter.java | 140 ---------
.../hadoop/mapred/TimelineEntityConverter.java | 211 -------------
.../mapred/TimelineServicePerformanceV2.java | 229 --------------
.../apache/hadoop/mapreduce/EntityWriterV2.java | 56 ++++
.../mapreduce/JobHistoryFileReplayMapperV1.java | 14 +-
.../mapreduce/JobHistoryFileReplayMapperV2.java | 161 ++++++++++
.../mapreduce/SimpleEntityWriterConstants.java | 43 +++
.../hadoop/mapreduce/SimpleEntityWriterV1.java | 28 +-
.../hadoop/mapreduce/SimpleEntityWriterV2.java | 131 ++++++++
.../mapreduce/TimelineEntityConverterV1.java | 5 -
.../mapreduce/TimelineEntityConverterV2.java | 211 +++++++++++++
.../mapreduce/TimelineServicePerformance.java | 127 +++++---
.../apache/hadoop/test/MapredTestDriver.java | 35 +--
16 files changed, 706 insertions(+), 1042 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 923751f..67a4a8a 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -17,6 +17,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
IMPROVEMENTS
+ MAPREDUCE-6546. reconcile the two versions of the timeline service
+ performance tests. (Sangjin Lee via Naganarasimha G R)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
deleted file mode 100644
index 9d051df..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-
-class JobHistoryFileParser {
- private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);
-
- private final FileSystem fs;
-
- public JobHistoryFileParser(FileSystem fs) {
- LOG.info("JobHistoryFileParser created with " + fs);
- this.fs = fs;
- }
-
- public JobInfo parseHistoryFile(Path path) throws IOException {
- LOG.info("parsing job history file " + path);
- JobHistoryParser parser = new JobHistoryParser(fs, path);
- return parser.parse();
- }
-
- public Configuration parseConfiguration(Path path) throws IOException {
- LOG.info("parsing job configuration file " + path);
- Configuration conf = new Configuration(false);
- conf.addResource(fs.open(path));
- return conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
deleted file mode 100644
index 4fb5308..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
-import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
-
-/**
- * Mapper for TimelineServicePerformanceV2 that replays job history files to the
- * timeline service.
- *
- */
-class JobHistoryFileReplayMapper extends EntityWriter {
- private static final Log LOG =
- LogFactory.getLog(JobHistoryFileReplayMapper.class);
-
- static final String PROCESSING_PATH = "processing path";
- static final String REPLAY_MODE = "replay mode";
- static final int WRITE_ALL_AT_ONCE = 1;
- static final int WRITE_PER_ENTITY = 2;
- static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE;
-
- private static final Pattern JOB_ID_PARSER =
- Pattern.compile("^(job_[0-9]+_([0-9]+)).*");
-
- public static class JobFiles {
- private final String jobId;
- private Path jobHistoryFilePath;
- private Path jobConfFilePath;
-
- public JobFiles(String jobId) {
- this.jobId = jobId;
- }
-
- public String getJobId() {
- return jobId;
- }
-
- public Path getJobHistoryFilePath() {
- return jobHistoryFilePath;
- }
-
- public void setJobHistoryFilePath(Path jobHistoryFilePath) {
- this.jobHistoryFilePath = jobHistoryFilePath;
- }
-
- public Path getJobConfFilePath() {
- return jobConfFilePath;
- }
-
- public void setJobConfFilePath(Path jobConfFilePath) {
- this.jobConfFilePath = jobConfFilePath;
- }
-
- @Override
- public int hashCode() {
- return jobId.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- JobFiles other = (JobFiles) obj;
- return jobId.equals(other.jobId);
- }
- }
-
- private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN }
-
-
- @Override
- protected void writeEntities(Configuration tlConf,
- TimelineCollectorManager manager, Context context) throws IOException {
- // collect the apps it needs to process
- Configuration conf = context.getConfiguration();
- int taskId = context.getTaskAttemptID().getTaskID().getId();
- int size = conf.getInt(MRJobConfig.NUM_MAPS,
- TimelineServicePerformanceV2.NUM_MAPS_DEFAULT);
- String processingDir =
- conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
- int replayMode =
- conf.getInt(JobHistoryFileReplayMapper.REPLAY_MODE,
- JobHistoryFileReplayMapper.REPLAY_MODE_DEFAULT);
- Path processingPath = new Path(processingDir);
- FileSystem processingFs = processingPath.getFileSystem(conf);
- JobHistoryFileParser parser = new JobHistoryFileParser(processingFs);
- TimelineEntityConverter converter = new TimelineEntityConverter();
-
- Collection<JobFiles> jobs =
- selectJobFiles(processingFs, processingPath, taskId, size);
- if (jobs.isEmpty()) {
- LOG.info(context.getTaskAttemptID().getTaskID() +
- " will process no jobs");
- } else {
- LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
- jobs.size() + " jobs");
- }
- for (JobFiles job: jobs) {
- // process each job
- String jobIdStr = job.getJobId();
- LOG.info("processing " + jobIdStr + "...");
- JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
- ApplicationId appId = jobId.getAppId();
-
- // create the app level timeline collector and start it
- AppLevelTimelineCollector collector =
- new AppLevelTimelineCollector(appId);
- manager.putIfAbsent(appId, collector);
- try {
- // parse the job info and configuration
- JobInfo jobInfo =
- parser.parseHistoryFile(job.getJobHistoryFilePath());
- Configuration jobConf =
- parser.parseConfiguration(job.getJobConfFilePath());
- LOG.info("parsed the job history file and the configuration file for job"
- + jobIdStr);
-
- // set the context
- // flow id: job name, flow run id: timestamp, user id
- TimelineCollectorContext tlContext =
- collector.getTimelineEntityContext();
- tlContext.setFlowName(jobInfo.getJobname());
- tlContext.setFlowRunId(jobInfo.getSubmitTime());
- tlContext.setUserId(jobInfo.getUsername());
-
- // create entities from job history and write them
- long totalTime = 0;
- List<TimelineEntity> entitySet =
- converter.createTimelineEntities(jobInfo, jobConf);
- LOG.info("converted them into timeline entities for job " + jobIdStr);
- // use the current user for this purpose
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- long startWrite = System.nanoTime();
- try {
- switch (replayMode) {
- case JobHistoryFileReplayMapper.WRITE_ALL_AT_ONCE:
- writeAllEntities(collector, entitySet, ugi);
- break;
- case JobHistoryFileReplayMapper.WRITE_PER_ENTITY:
- writePerEntity(collector, entitySet, ugi);
- break;
- default:
- break;
- }
- } catch (Exception e) {
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
- increment(1);
- LOG.error("writing to the timeline service failed", e);
- }
- long endWrite = System.nanoTime();
- totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
- int numEntities = entitySet.size();
- LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
-
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
- increment(totalTime);
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
- increment(numEntities);
- } finally {
- manager.remove(appId);
- context.progress(); // move it along
- }
- }
- }
-
- private void writeAllEntities(AppLevelTimelineCollector collector,
- List<TimelineEntity> entitySet, UserGroupInformation ugi)
- throws IOException {
- TimelineEntities entities = new TimelineEntities();
- entities.setEntities(entitySet);
- collector.putEntities(entities, ugi);
- }
-
- private void writePerEntity(AppLevelTimelineCollector collector,
- List<TimelineEntity> entitySet, UserGroupInformation ugi)
- throws IOException {
- for (TimelineEntity entity : entitySet) {
- TimelineEntities entities = new TimelineEntities();
- entities.addEntity(entity);
- collector.putEntities(entities, ugi);
- LOG.info("wrote entity " + entity.getId());
- }
- }
-
- private Collection<JobFiles> selectJobFiles(FileSystem fs,
- Path processingRoot, int i, int size) throws IOException {
- Map<String,JobFiles> jobs = new HashMap<>();
- RemoteIterator<LocatedFileStatus> it = fs.listFiles(processingRoot, true);
- while (it.hasNext()) {
- LocatedFileStatus status = it.next();
- Path path = status.getPath();
- String fileName = path.getName();
- Matcher m = JOB_ID_PARSER.matcher(fileName);
- if (!m.matches()) {
- continue;
- }
- String jobId = m.group(1);
- int lastId = Integer.parseInt(m.group(2));
- int mod = lastId % size;
- if (mod != i) {
- continue;
- }
- LOG.info("this mapper will process file " + fileName);
- // it's mine
- JobFiles jobFiles = jobs.get(jobId);
- if (jobFiles == null) {
- jobFiles = new JobFiles(jobId);
- jobs.put(jobId, jobFiles);
- }
- setFilePath(fileName, path, jobFiles);
- }
- return jobs.values();
- }
-
- private void setFilePath(String fileName, Path path,
- JobFiles jobFiles) {
- // determine if we're dealing with a job history file or a job conf file
- FileType type = getFileType(fileName);
- switch (type) {
- case JOB_HISTORY_FILE:
- if (jobFiles.getJobHistoryFilePath() == null) {
- jobFiles.setJobHistoryFilePath(path);
- } else {
- LOG.warn("we already have the job history file " +
- jobFiles.getJobHistoryFilePath() + ": skipping " + path);
- }
- break;
- case JOB_CONF_FILE:
- if (jobFiles.getJobConfFilePath() == null) {
- jobFiles.setJobConfFilePath(path);
- } else {
- LOG.warn("we already have the job conf file " +
- jobFiles.getJobConfFilePath() + ": skipping " + path);
- }
- break;
- case UNKNOWN:
- LOG.warn("unknown type: " + path);
- }
- }
-
- private FileType getFileType(String fileName) {
- if (fileName.endsWith(".jhist")) {
- return FileType.JOB_HISTORY_FILE;
- }
- if (fileName.endsWith("_conf.xml")) {
- return FileType.JOB_CONF_FILE;
- }
- return FileType.UNKNOWN;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
deleted file mode 100644
index 625c32a..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
-import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
-
-/**
- * Adds simple entities with random string payload, events, metrics, and
- * configuration.
- */
-class SimpleEntityWriter extends EntityWriter {
- private static final Log LOG = LogFactory.getLog(SimpleEntityWriter.class);
-
- // constants for mtype = 1
- static final String KBS_SENT = "kbs sent";
- static final int KBS_SENT_DEFAULT = 1;
- static final String TEST_TIMES = "testtimes";
- static final int TEST_TIMES_DEFAULT = 100;
- static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
- "timeline.server.performance.run.id";
-
- protected void writeEntities(Configuration tlConf,
- TimelineCollectorManager manager, Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- // simulate the app id with the task id
- int taskId = context.getTaskAttemptID().getTaskID().getId();
- long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
- ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
-
- // create the app level timeline collector
- AppLevelTimelineCollector collector =
- new AppLevelTimelineCollector(appId);
- manager.putIfAbsent(appId, collector);
-
- try {
- // set the context
- // flow id: job name, flow run id: timestamp, user id
- TimelineCollectorContext tlContext =
- collector.getTimelineEntityContext();
- tlContext.setFlowName(context.getJobName());
- tlContext.setFlowRunId(timestamp);
- tlContext.setUserId(context.getUser());
-
- final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
-
- long totalTime = 0;
- final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
- final Random rand = new Random();
- final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
- final char[] payLoad = new char[kbs * 1024];
-
- for (int i = 0; i < testtimes; i++) {
- // Generate a fixed length random payload
- for (int xx = 0; xx < kbs * 1024; xx++) {
- int alphaNumIdx =
- rand.nextInt(TimelineServicePerformanceV2.alphaNums.length);
- payLoad[xx] = TimelineServicePerformanceV2.alphaNums[alphaNumIdx];
- }
- String entId = taskAttemptId + "_" + Integer.toString(i);
- final TimelineEntity entity = new TimelineEntity();
- entity.setId(entId);
- entity.setType("FOO_ATTEMPT");
- entity.addInfo("PERF_TEST", payLoad);
- // add an event
- TimelineEvent event = new TimelineEvent();
- event.setId("foo_event_id");
- event.setTimestamp(System.currentTimeMillis());
- event.addInfo("foo_event", "test");
- entity.addEvent(event);
- // add a metric
- TimelineMetric metric = new TimelineMetric();
- metric.setId("foo_metric");
- metric.addValue(System.currentTimeMillis(), 123456789L);
- entity.addMetric(metric);
- // add a config
- entity.addConfig("foo", "bar");
-
- TimelineEntities entities = new TimelineEntities();
- entities.addEntity(entity);
- // use the current user for this purpose
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- long startWrite = System.nanoTime();
- try {
- collector.putEntities(entities, ugi);
- } catch (Exception e) {
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
- increment(1);
- LOG.error("writing to the timeline service failed", e);
- }
- long endWrite = System.nanoTime();
- totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
- }
- LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
- " kB) in " + totalTime + " ms");
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
- increment(totalTime);
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
- increment(testtimes);
- context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
- increment(kbs*testtimes);
- } finally {
- // clean up
- manager.remove(appId);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
deleted file mode 100644
index 0e2eb72..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-class TimelineEntityConverter {
- private static final Log LOG =
- LogFactory.getLog(TimelineEntityConverter.class);
-
- static final String JOB = "MAPREDUCE_JOB";
- static final String TASK = "MAPREDUCE_TASK";
- static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT";
-
- /**
- * Creates job, task, and task attempt entities based on the job history info
- * and configuration.
- *
- * Note: currently these are plan timeline entities created for mapreduce
- * types. These are not meant to be the complete and accurate entity set-up
- * for mapreduce jobs. We do not leverage hierarchical timeline entities. If
- * we create canonical mapreduce hierarchical timeline entities with proper
- * parent-child relationship, we could modify this to use that instead.
- *
- * Note that we also do not add info to the YARN application entity, which
- * would be needed for aggregation.
- */
- public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
- Configuration conf) {
- List<TimelineEntity> entities = new ArrayList<>();
-
- // create the job entity
- TimelineEntity job = createJobEntity(jobInfo, conf);
- entities.add(job);
-
- // create the task and task attempt entities
- List<TimelineEntity> tasksAndAttempts =
- createTaskAndTaskAttemptEntities(jobInfo);
- entities.addAll(tasksAndAttempts);
-
- return entities;
- }
-
- private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
- TimelineEntity job = new TimelineEntity();
- job.setType(JOB);
- job.setId(jobInfo.getJobId().toString());
- job.setCreatedTime(jobInfo.getSubmitTime());
-
- job.addInfo("JOBNAME", jobInfo.getJobname());
- job.addInfo("USERNAME", jobInfo.getUsername());
- job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName());
- job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime());
- job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime());
- job.addInfo("FINISH_TIME", jobInfo.getFinishTime());
- job.addInfo("JOB_STATUS", jobInfo.getJobStatus());
- job.addInfo("PRIORITY", jobInfo.getPriority());
- job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps());
- job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces());
- job.addInfo("UBERIZED", jobInfo.getUberized());
- job.addInfo("ERROR_INFO", jobInfo.getErrorInfo());
-
- // add metrics from total counters
- // we omit the map counters and reduce counters for now as it's kind of
- // awkward to put them (map/reduce/total counters are really a group of
- // related counters)
- Counters totalCounters = jobInfo.getTotalCounters();
- if (totalCounters != null) {
- addMetrics(job, totalCounters);
- }
- // finally add configuration to the job
- addConfiguration(job, conf);
- LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity");
- return job;
- }
-
- private void addConfiguration(TimelineEntity job, Configuration conf) {
- for (Map.Entry<String,String> e: conf) {
- job.addConfig(e.getKey(), e.getValue());
- }
- }
-
- private void addMetrics(TimelineEntity entity, Counters counters) {
- for (CounterGroup g: counters) {
- String groupName = g.getName();
- for (Counter c: g) {
- String name = groupName + ":" + c.getName();
- TimelineMetric metric = new TimelineMetric();
- metric.setId(name);
- metric.addValue(System.currentTimeMillis(), c.getValue());
- entity.addMetric(metric);
- }
- }
- }
-
- private List<TimelineEntity> createTaskAndTaskAttemptEntities(
- JobInfo jobInfo) {
- List<TimelineEntity> entities = new ArrayList<>();
- Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
- LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
- " tasks");
- for (TaskInfo taskInfo: taskInfoMap.values()) {
- TimelineEntity task = createTaskEntity(taskInfo);
- entities.add(task);
- // add the task attempts from this task
- Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo);
- entities.addAll(taskAttempts);
- }
- return entities;
- }
-
- private TimelineEntity createTaskEntity(TaskInfo taskInfo) {
- TimelineEntity task = new TimelineEntity();
- task.setType(TASK);
- task.setId(taskInfo.getTaskId().toString());
- task.setCreatedTime(taskInfo.getStartTime());
-
- task.addInfo("START_TIME", taskInfo.getStartTime());
- task.addInfo("FINISH_TIME", taskInfo.getFinishTime());
- task.addInfo("TASK_TYPE", taskInfo.getTaskType());
- task.addInfo("TASK_STATUS", taskInfo.getTaskStatus());
- task.addInfo("ERROR_INFO", taskInfo.getError());
-
- // add metrics from counters
- Counters counters = taskInfo.getCounters();
- if (counters != null) {
- addMetrics(task, counters);
- }
- LOG.info("converted task " + taskInfo.getTaskId() +
- " to a timeline entity");
- return task;
- }
-
- private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
- Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
- Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
- taskInfo.getAllTaskAttempts();
- LOG.info("task " + taskInfo.getTaskId() + " has " +
- taskAttemptInfoMap.size() + " task attempts");
- for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) {
- TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo);
- taskAttempts.add(taskAttempt);
- }
- return taskAttempts;
- }
-
- private TimelineEntity createTaskAttemptEntity(
- TaskAttemptInfo taskAttemptInfo) {
- TimelineEntity taskAttempt = new TimelineEntity();
- taskAttempt.setType(TASK_ATTEMPT);
- taskAttempt.setId(taskAttemptInfo.getAttemptId().toString());
- taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime());
-
- taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime());
- taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime());
- taskAttempt.addInfo("MAP_FINISH_TIME",
- taskAttemptInfo.getMapFinishTime());
- taskAttempt.addInfo("SHUFFLE_FINISH_TIME",
- taskAttemptInfo.getShuffleFinishTime());
- taskAttempt.addInfo("SORT_FINISH_TIME",
- taskAttemptInfo.getSortFinishTime());
- taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus());
- taskAttempt.addInfo("STATE", taskAttemptInfo.getState());
- taskAttempt.addInfo("ERROR", taskAttemptInfo.getError());
- taskAttempt.addInfo("CONTAINER_ID",
- taskAttemptInfo.getContainerId().toString());
-
- // add metrics from counters
- Counters counters = taskAttemptInfo.getCounters();
- if (counters != null) {
- addMetrics(taskAttempt, counters);
- }
- LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() +
- " to a timeline entity");
- return taskAttempt;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
deleted file mode 100644
index f674ae1..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.Date;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
-
-public class TimelineServicePerformanceV2 extends Configured implements Tool {
- static final int NUM_MAPS_DEFAULT = 1;
-
- static final int SIMPLE_ENTITY_WRITER = 1;
- static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2;
- static int mapperType = SIMPLE_ENTITY_WRITER;
-
- protected static int printUsage() {
- System.err.println(
- "Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
- ")\n" +
- " [-mtype <mapper type in integer>]\n" +
- " 1. simple entity write mapper\n" +
- " 2. job history file replay mapper\n" +
- " [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
- SimpleEntityWriter.KBS_SENT_DEFAULT + " KB)\n" +
- " [-t] package sending iterations per mapper (mtype=1, default: " +
- SimpleEntityWriter.TEST_TIMES_DEFAULT + ")\n" +
- " [-d <path>] root path of job history files (mtype=2)\n" +
- " [-r <replay mode>] (mtype=2)\n" +
- " 1. write all entities for a job in one put (default)\n" +
- " 2. write one entity at a time\n");
- GenericOptionsParser.printGenericCommandUsage(System.err);
- return -1;
- }
-
- /**
- * Configure a job given argv.
- */
- public static boolean parseArgs(String[] args, Job job) throws IOException {
- // set the common defaults
- Configuration conf = job.getConfiguration();
- conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
-
- for (int i = 0; i < args.length; i++) {
- if (args.length == i + 1) {
- System.out.println("ERROR: Required parameter missing from " + args[i]);
- return printUsage() == 0;
- }
- try {
- if ("-m".equals(args[i])) {
- if (Integer.parseInt(args[++i]) > 0) {
- job.getConfiguration()
- .setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
- }
- } else if ("-mtype".equals(args[i])) {
- mapperType = Integer.parseInt(args[++i]);
- } else if ("-s".equals(args[i])) {
- if (Integer.parseInt(args[++i]) > 0) {
- conf.setInt(SimpleEntityWriter.KBS_SENT, Integer.parseInt(args[i]));
- }
- } else if ("-t".equals(args[i])) {
- if (Integer.parseInt(args[++i]) > 0) {
- conf.setInt(SimpleEntityWriter.TEST_TIMES,
- Integer.parseInt(args[i]));
- }
- } else if ("-d".equals(args[i])) {
- conf.set(JobHistoryFileReplayMapper.PROCESSING_PATH, args[++i]);
- } else if ("-r".equals(args[i])) {
- conf.setInt(JobHistoryFileReplayMapper.REPLAY_MODE,
- Integer.parseInt(args[++i]));
- } else {
- System.out.println("Unexpected argument: " + args[i]);
- return printUsage() == 0;
- }
- } catch (NumberFormatException except) {
- System.out.println("ERROR: Integer expected instead of " + args[i]);
- return printUsage() == 0;
- } catch (Exception e) {
- throw (IOException)new IOException().initCause(e);
- }
- }
-
- // handle mapper-specific settings
- switch (mapperType) {
- case JOB_HISTORY_FILE_REPLAY_MAPPER:
- job.setMapperClass(JobHistoryFileReplayMapper.class);
- String processingPath =
- conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
- if (processingPath == null || processingPath.isEmpty()) {
- System.out.println("processing path is missing while mtype = 2");
- return printUsage() == 0;
- }
- break;
- case SIMPLE_ENTITY_WRITER:
- default:
- job.setMapperClass(SimpleEntityWriter.class);
- // use the current timestamp as the "run id" of the test: this will
- // be used as simulating the cluster timestamp for apps
- conf.setLong(SimpleEntityWriter.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
- System.currentTimeMillis());
- break;
- }
-
- return true;
- }
-
- /**
- * TimelineServer Performance counters
- */
- static enum PerfCounters {
- TIMELINE_SERVICE_WRITE_TIME,
- TIMELINE_SERVICE_WRITE_COUNTER,
- TIMELINE_SERVICE_WRITE_FAILURES,
- TIMELINE_SERVICE_WRITE_KBS,
- }
-
- public int run(String[] args) throws Exception {
-
- Job job = Job.getInstance(getConf());
- job.setJarByClass(TimelineServicePerformanceV2.class);
- job.setMapperClass(SimpleEntityWriter.class);
- job.setInputFormatClass(SleepInputFormat.class);
- job.setOutputFormatClass(NullOutputFormat.class);
- job.setNumReduceTasks(0);
- if (!parseArgs(args, job)) {
- return -1;
- }
-
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int ret = job.waitForCompletion(true) ? 0 : 1;
- org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
- long writetime =
- counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue();
- long writecounts =
- counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue();
- long writesize =
- counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
- double transacrate = writecounts * 1000 / (double)writetime;
- double iorate = writesize * 1000 / (double)writetime;
- int numMaps =
- Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS));
-
- System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
- " ops/s");
- System.out.println("IO RATE (per mapper): " + iorate + " KB/s");
-
- System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps +
- " ops/s");
- System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s");
-
- return ret;
- }
-
- public static void main(String[] args) throws Exception {
- int res =
- ToolRunner.run(new Configuration(), new TimelineServicePerformanceV2(),
- args);
- System.exit(res);
- }
-
- /**
- * To ensure that the compression really gets exercised, generate a
- * random alphanumeric fixed length payload
- */
- static final char[] alphaNums = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
- 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
- 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
- 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
- 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
- '3', '4', '5', '6', '7', '8', '9', '0', ' ' };
-
- /**
- * Base mapper for writing entities to the timeline service. Subclasses
- * override {@link #writeEntities(Configuration, TimelineCollectorManager,
- * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
- * to the timeline service.
- */
- public static abstract class EntityWriter
- extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
- @Override
- public void map(IntWritable key, IntWritable val, Context context)
- throws IOException {
-
- // create the timeline collector manager wired with the writer
- Configuration tlConf = new YarnConfiguration();
- TimelineCollectorManager manager = new TimelineCollectorManager("test");
- manager.init(tlConf);
- manager.start();
- try {
- // invoke the method to have the subclass write entities
- writeEntities(tlConf, manager, context);
- } finally {
- manager.close();
- }
- }
-
- protected abstract void writeEntities(Configuration tlConf,
- TimelineCollectorManager manager, Context context) throws IOException;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
new file mode 100644
index 0000000..f5d95c3
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Base mapper for writing entities to the timeline service. Subclasses
+ * override {@link #writeEntities(Configuration, TimelineCollectorManager,
+ * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
+ * to the timeline service.
+ */
+abstract class EntityWriterV2
+ extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
+ @Override
+ public void map(IntWritable key, IntWritable val, Context context)
+ throws IOException {
+
+ // create the timeline collector manager wired with the writer
+ Configuration tlConf = new YarnConfiguration();
+ TimelineCollectorManager manager = new TimelineCollectorManager("test");
+ manager.init(tlConf);
+ manager.start();
+ try {
+ // invoke the method to have the subclass write entities
+ writeEntities(tlConf, manager, context);
+ } finally {
+ manager.close();
+ }
+ }
+
+ protected abstract void writeEntities(Configuration tlConf,
+ TimelineCollectorManager manager, Context context) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
index 5e10662..447ea4e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
@@ -20,33 +20,21 @@ package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
-import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper;
import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
new file mode 100644
index 0000000..6a9a878
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Mapper for TimelineServicePerformance that replays job history files to the
+ * timeline service v.2.
+ *
+ */
+class JobHistoryFileReplayMapperV2 extends EntityWriterV2 {
+ private static final Log LOG =
+ LogFactory.getLog(JobHistoryFileReplayMapperV2.class);
+
+ @Override
+ protected void writeEntities(Configuration tlConf,
+ TimelineCollectorManager manager, Context context) throws IOException {
+ JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context);
+ int replayMode = helper.getReplayMode();
+ JobHistoryFileParser parser = helper.getParser();
+ TimelineEntityConverterV2 converter = new TimelineEntityConverterV2();
+
+ // collect the apps it needs to process
+ Collection<JobFiles> jobs = helper.getJobFiles();
+ if (jobs.isEmpty()) {
+ LOG.info(context.getTaskAttemptID().getTaskID() +
+ " will process no jobs");
+ } else {
+ LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
+ jobs.size() + " jobs");
+ }
+ for (JobFiles job: jobs) {
+ // process each job
+ String jobIdStr = job.getJobId();
+ // skip if either of the file is missing
+ if (job.getJobConfFilePath() == null ||
+ job.getJobHistoryFilePath() == null) {
+ LOG.info(jobIdStr + " missing either the job history file or the " +
+ "configuration file. Skipping.");
+ continue;
+ }
+ LOG.info("processing " + jobIdStr + "...");
+ JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
+ ApplicationId appId = jobId.getAppId();
+
+ // create the app level timeline collector and start it
+ AppLevelTimelineCollector collector =
+ new AppLevelTimelineCollector(appId);
+ manager.putIfAbsent(appId, collector);
+ try {
+ // parse the job info and configuration
+ JobInfo jobInfo =
+ parser.parseHistoryFile(job.getJobHistoryFilePath());
+ Configuration jobConf =
+ parser.parseConfiguration(job.getJobConfFilePath());
+ LOG.info("parsed the job history file and the configuration file for job"
+ + jobIdStr);
+
+ // set the context
+ // flow id: job name, flow run id: timestamp, user id
+ TimelineCollectorContext tlContext =
+ collector.getTimelineEntityContext();
+ tlContext.setFlowName(jobInfo.getJobname());
+ tlContext.setFlowRunId(jobInfo.getSubmitTime());
+ tlContext.setUserId(jobInfo.getUsername());
+
+ // create entities from job history and write them
+ long totalTime = 0;
+ List<TimelineEntity> entitySet =
+ converter.createTimelineEntities(jobInfo, jobConf);
+ LOG.info("converted them into timeline entities for job " + jobIdStr);
+ // use the current user for this purpose
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ long startWrite = System.nanoTime();
+ try {
+ switch (replayMode) {
+ case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE:
+ writeAllEntities(collector, entitySet, ugi);
+ break;
+ case JobHistoryFileReplayHelper.WRITE_PER_ENTITY:
+ writePerEntity(collector, entitySet, ugi);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+ increment(1);
+ LOG.error("writing to the timeline service failed", e);
+ }
+ long endWrite = System.nanoTime();
+ totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+ int numEntities = entitySet.size();
+ LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
+
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+ increment(totalTime);
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+ increment(numEntities);
+ } finally {
+ manager.remove(appId);
+ context.progress(); // move it along
+ }
+ }
+ }
+
+ private void writeAllEntities(AppLevelTimelineCollector collector,
+ List<TimelineEntity> entitySet, UserGroupInformation ugi)
+ throws IOException {
+ TimelineEntities entities = new TimelineEntities();
+ entities.setEntities(entitySet);
+ collector.putEntities(entities, ugi);
+ }
+
+ private void writePerEntity(AppLevelTimelineCollector collector,
+ List<TimelineEntity> entitySet, UserGroupInformation ugi)
+ throws IOException {
+ for (TimelineEntity entity : entitySet) {
+ TimelineEntities entities = new TimelineEntities();
+ entities.addEntity(entity);
+ collector.putEntities(entities, ugi);
+ LOG.info("wrote entity " + entity.getId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
new file mode 100644
index 0000000..b89d0e8
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Constants for simple entity writers.
+ */
+interface SimpleEntityWriterConstants {
+ // constants for mtype = 1
+ String KBS_SENT = "kbs sent";
+ int KBS_SENT_DEFAULT = 1;
+ String TEST_TIMES = "testtimes";
+ int TEST_TIMES_DEFAULT = 100;
+ String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
+ "timeline.server.performance.run.id";
+
+ /**
+ * To ensure that the compression really gets exercised, generate a
+ * random alphanumeric fixed length payload
+ */
+ char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
+ 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
+ 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
+ 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
+ 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
+ '3', '4', '5', '6', '7', '8', '9', '0', ' ' };
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
index 2c851e9..b10ae04 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
@@ -27,44 +27,22 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Adds simple entities with random string payload, events, metrics, and
* configuration.
*/
-class SimpleEntityWriterV1 extends
- org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
+class SimpleEntityWriterV1
+ extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable>
+ implements SimpleEntityWriterConstants {
private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class);
- // constants for mtype = 1
- static final String KBS_SENT = "kbs sent";
- static final int KBS_SENT_DEFAULT = 1;
- static final String TEST_TIMES = "testtimes";
- static final int TEST_TIMES_DEFAULT = 100;
- static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
- "timeline.server.performance.run.id";
- /**
- * To ensure that the compression really gets exercised, generate a
- * random alphanumeric fixed length payload
- */
- private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
- 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
- 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
- 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
- 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
- '3', '4', '5', '6', '7', '8', '9', '0', ' ' };
-
public void map(IntWritable key, IntWritable val, Context context) throws IOException {
TimelineClient tlc = new TimelineClientImpl();
Configuration conf = context.getConfiguration();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
new file mode 100644
index 0000000..d66deb0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Adds simple entities with random string payload, events, metrics, and
+ * configuration.
+ */
+class SimpleEntityWriterV2 extends EntityWriterV2
+ implements SimpleEntityWriterConstants {
+ private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV2.class);
+
+ protected void writeEntities(Configuration tlConf,
+ TimelineCollectorManager manager, Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ // simulate the app id with the task id
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
+ ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
+
+ // create the app level timeline collector
+ AppLevelTimelineCollector collector =
+ new AppLevelTimelineCollector(appId);
+ manager.putIfAbsent(appId, collector);
+
+ try {
+ // set the context
+ // flow id: job name, flow run id: timestamp, user id
+ TimelineCollectorContext tlContext =
+ collector.getTimelineEntityContext();
+ tlContext.setFlowName(context.getJobName());
+ tlContext.setFlowRunId(timestamp);
+ tlContext.setUserId(context.getUser());
+
+ final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
+
+ long totalTime = 0;
+ final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
+ final Random rand = new Random();
+ final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
+ final char[] payLoad = new char[kbs * 1024];
+
+ for (int i = 0; i < testtimes; i++) {
+ // Generate a fixed length random payload
+ for (int xx = 0; xx < kbs * 1024; xx++) {
+ int alphaNumIdx =
+ rand.nextInt(ALPHA_NUMS.length);
+ payLoad[xx] = ALPHA_NUMS[alphaNumIdx];
+ }
+ String entId = taskAttemptId + "_" + Integer.toString(i);
+ final TimelineEntity entity = new TimelineEntity();
+ entity.setId(entId);
+ entity.setType("FOO_ATTEMPT");
+ entity.addInfo("PERF_TEST", payLoad);
+ // add an event
+ TimelineEvent event = new TimelineEvent();
+ event.setId("foo_event_id");
+ event.setTimestamp(System.currentTimeMillis());
+ event.addInfo("foo_event", "test");
+ entity.addEvent(event);
+ // add a metric
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId("foo_metric");
+ metric.addValue(System.currentTimeMillis(), 123456789L);
+ entity.addMetric(metric);
+ // add a config
+ entity.addConfig("foo", "bar");
+
+ TimelineEntities entities = new TimelineEntities();
+ entities.addEntity(entity);
+ // use the current user for this purpose
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ long startWrite = System.nanoTime();
+ try {
+ collector.putEntities(entities, ugi);
+ } catch (Exception e) {
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+ increment(1);
+ LOG.error("writing to the timeline service failed", e);
+ }
+ long endWrite = System.nanoTime();
+ totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+ }
+ LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
+ " kB) in " + totalTime + " ms");
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+ increment(totalTime);
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+ increment(testtimes);
+ context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
+ increment(kbs*testtimes);
+ } finally {
+ // clean up
+ manager.remove(appId);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
index 79d123e..4d8b74b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
@@ -25,11 +25,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java
new file mode 100644
index 0000000..79633d2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+class TimelineEntityConverterV2 {
+ private static final Log LOG =
+ LogFactory.getLog(TimelineEntityConverterV2.class);
+
+ static final String JOB = "MAPREDUCE_JOB";
+ static final String TASK = "MAPREDUCE_TASK";
+ static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT";
+
+ /**
+ * Creates job, task, and task attempt entities based on the job history info
+ * and configuration.
+ *
+ * Note: currently these are plan timeline entities created for mapreduce
+ * types. These are not meant to be the complete and accurate entity set-up
+ * for mapreduce jobs. We do not leverage hierarchical timeline entities. If
+ * we create canonical mapreduce hierarchical timeline entities with proper
+ * parent-child relationship, we could modify this to use that instead.
+ *
+ * Note that we also do not add info to the YARN application entity, which
+ * would be needed for aggregation.
+ */
+ public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
+ Configuration conf) {
+ List<TimelineEntity> entities = new ArrayList<>();
+
+ // create the job entity
+ TimelineEntity job = createJobEntity(jobInfo, conf);
+ entities.add(job);
+
+ // create the task and task attempt entities
+ List<TimelineEntity> tasksAndAttempts =
+ createTaskAndTaskAttemptEntities(jobInfo);
+ entities.addAll(tasksAndAttempts);
+
+ return entities;
+ }
+
+ private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
+ TimelineEntity job = new TimelineEntity();
+ job.setType(JOB);
+ job.setId(jobInfo.getJobId().toString());
+ job.setCreatedTime(jobInfo.getSubmitTime());
+
+ job.addInfo("JOBNAME", jobInfo.getJobname());
+ job.addInfo("USERNAME", jobInfo.getUsername());
+ job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName());
+ job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime());
+ job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime());
+ job.addInfo("FINISH_TIME", jobInfo.getFinishTime());
+ job.addInfo("JOB_STATUS", jobInfo.getJobStatus());
+ job.addInfo("PRIORITY", jobInfo.getPriority());
+ job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps());
+ job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces());
+ job.addInfo("UBERIZED", jobInfo.getUberized());
+ job.addInfo("ERROR_INFO", jobInfo.getErrorInfo());
+
+ // add metrics from total counters
+ // we omit the map counters and reduce counters for now as it's kind of
+ // awkward to put them (map/reduce/total counters are really a group of
+ // related counters)
+ Counters totalCounters = jobInfo.getTotalCounters();
+ if (totalCounters != null) {
+ addMetrics(job, totalCounters);
+ }
+ // finally add configuration to the job
+ addConfiguration(job, conf);
+ LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity");
+ return job;
+ }
+
+ private void addConfiguration(TimelineEntity job, Configuration conf) {
+ for (Map.Entry<String,String> e: conf) {
+ job.addConfig(e.getKey(), e.getValue());
+ }
+ }
+
+ private void addMetrics(TimelineEntity entity, Counters counters) {
+ for (CounterGroup g: counters) {
+ String groupName = g.getName();
+ for (Counter c: g) {
+ String name = groupName + ":" + c.getName();
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId(name);
+ metric.addValue(System.currentTimeMillis(), c.getValue());
+ entity.addMetric(metric);
+ }
+ }
+ }
+
+ private List<TimelineEntity> createTaskAndTaskAttemptEntities(
+ JobInfo jobInfo) {
+ List<TimelineEntity> entities = new ArrayList<>();
+ Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
+ LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
+ " tasks");
+ for (TaskInfo taskInfo: taskInfoMap.values()) {
+ TimelineEntity task = createTaskEntity(taskInfo);
+ entities.add(task);
+ // add the task attempts from this task
+ Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo);
+ entities.addAll(taskAttempts);
+ }
+ return entities;
+ }
+
+ private TimelineEntity createTaskEntity(TaskInfo taskInfo) {
+ TimelineEntity task = new TimelineEntity();
+ task.setType(TASK);
+ task.setId(taskInfo.getTaskId().toString());
+ task.setCreatedTime(taskInfo.getStartTime());
+
+ task.addInfo("START_TIME", taskInfo.getStartTime());
+ task.addInfo("FINISH_TIME", taskInfo.getFinishTime());
+ task.addInfo("TASK_TYPE", taskInfo.getTaskType());
+ task.addInfo("TASK_STATUS", taskInfo.getTaskStatus());
+ task.addInfo("ERROR_INFO", taskInfo.getError());
+
+ // add metrics from counters
+ Counters counters = taskInfo.getCounters();
+ if (counters != null) {
+ addMetrics(task, counters);
+ }
+ LOG.info("converted task " + taskInfo.getTaskId() +
+ " to a timeline entity");
+ return task;
+ }
+
+ private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
+ Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
+ Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
+ taskInfo.getAllTaskAttempts();
+ LOG.info("task " + taskInfo.getTaskId() + " has " +
+ taskAttemptInfoMap.size() + " task attempts");
+ for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) {
+ TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo);
+ taskAttempts.add(taskAttempt);
+ }
+ return taskAttempts;
+ }
+
+ private TimelineEntity createTaskAttemptEntity(
+ TaskAttemptInfo taskAttemptInfo) {
+ TimelineEntity taskAttempt = new TimelineEntity();
+ taskAttempt.setType(TASK_ATTEMPT);
+ taskAttempt.setId(taskAttemptInfo.getAttemptId().toString());
+ taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime());
+
+ taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime());
+ taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime());
+ taskAttempt.addInfo("MAP_FINISH_TIME",
+ taskAttemptInfo.getMapFinishTime());
+ taskAttempt.addInfo("SHUFFLE_FINISH_TIME",
+ taskAttemptInfo.getShuffleFinishTime());
+ taskAttempt.addInfo("SORT_FINISH_TIME",
+ taskAttemptInfo.getSortFinishTime());
+ taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus());
+ taskAttempt.addInfo("STATE", taskAttemptInfo.getState());
+ taskAttempt.addInfo("ERROR", taskAttemptInfo.getError());
+ taskAttempt.addInfo("CONTAINER_ID",
+ taskAttemptInfo.getContainerId().toString());
+
+ // add metrics from counters
+ Counters counters = taskAttemptInfo.getCounters();
+ if (counters != null) {
+ addMetrics(taskAttempt, counters);
+ }
+ LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() +
+ " to a timeline entity");
+ return taskAttempt;
+ }
+}