You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2016/03/09 06:51:32 UTC

[2/2] hadoop git commit: MAPREDUCE-6546. reconcile the two versions of the timeline service performance tests. (Sangjin Lee via Naganarasimha G R)

MAPREDUCE-6546. reconcile the two versions of the timeline service performance tests. (Sangjin Lee via Naganarasimha G R)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c6f4c513
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c6f4c513
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c6f4c513

Branch: refs/heads/YARN-2928
Commit: c6f4c51360d93f02714fa05980b8c4dd9274ff1d
Parents: 85513ea
Author: naganarasimha <na...@apache.com>
Authored: Wed Mar 9 11:20:32 2016 +0530
Committer: naganarasimha <na...@apache.com>
Committed: Wed Mar 9 11:20:32 2016 +0530

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../hadoop/mapred/JobHistoryFileParser.java     |  53 ----
 .../mapred/JobHistoryFileReplayMapper.java      | 301 -------------------
 .../hadoop/mapred/SimpleEntityWriter.java       | 140 ---------
 .../hadoop/mapred/TimelineEntityConverter.java  | 211 -------------
 .../mapred/TimelineServicePerformanceV2.java    | 229 --------------
 .../apache/hadoop/mapreduce/EntityWriterV2.java |  56 ++++
 .../mapreduce/JobHistoryFileReplayMapperV1.java |  14 +-
 .../mapreduce/JobHistoryFileReplayMapperV2.java | 161 ++++++++++
 .../mapreduce/SimpleEntityWriterConstants.java  |  43 +++
 .../hadoop/mapreduce/SimpleEntityWriterV1.java  |  28 +-
 .../hadoop/mapreduce/SimpleEntityWriterV2.java  | 131 ++++++++
 .../mapreduce/TimelineEntityConverterV1.java    |   5 -
 .../mapreduce/TimelineEntityConverterV2.java    | 211 +++++++++++++
 .../mapreduce/TimelineServicePerformance.java   | 127 +++++---
 .../apache/hadoop/test/MapredTestDriver.java    |  35 +--
 16 files changed, 706 insertions(+), 1042 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 923751f..67a4a8a 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -17,6 +17,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
 
   IMPROVEMENTS
 
+    MAPREDUCE-6546. reconcile the two versions of the timeline service
+    performance tests. (Sangjin Lee via Naganarasimha G R)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
deleted file mode 100644
index 9d051df..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-
-class JobHistoryFileParser {
-  private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);
-
-  private final FileSystem fs;
-
-  public JobHistoryFileParser(FileSystem fs) {
-    LOG.info("JobHistoryFileParser created with " + fs);
-    this.fs = fs;
-  }
-
-  public JobInfo parseHistoryFile(Path path) throws IOException {
-    LOG.info("parsing job history file " + path);
-    JobHistoryParser parser = new JobHistoryParser(fs, path);
-    return parser.parse();
-  }
-
-  public Configuration parseConfiguration(Path path) throws IOException {
-    LOG.info("parsing job configuration file " + path);
-    Configuration conf = new Configuration(false);
-    conf.addResource(fs.open(path));
-    return conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
deleted file mode 100644
index 4fb5308..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
-import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
-
-/**
- * Mapper for TimelineServicePerformanceV2 that replays job history files to the
- * timeline service.
- *
- */
-class JobHistoryFileReplayMapper extends EntityWriter {
-  private static final Log LOG =
-      LogFactory.getLog(JobHistoryFileReplayMapper.class);
-
-  static final String PROCESSING_PATH = "processing path";
-  static final String REPLAY_MODE = "replay mode";
-  static final int WRITE_ALL_AT_ONCE = 1;
-  static final int WRITE_PER_ENTITY = 2;
-  static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE;
-
-  private static final Pattern JOB_ID_PARSER =
-      Pattern.compile("^(job_[0-9]+_([0-9]+)).*");
-
-  public static class JobFiles {
-    private final String jobId;
-    private Path jobHistoryFilePath;
-    private Path jobConfFilePath;
-
-    public JobFiles(String jobId) {
-      this.jobId = jobId;
-    }
-
-    public String getJobId() {
-      return jobId;
-    }
-
-    public Path getJobHistoryFilePath() {
-      return jobHistoryFilePath;
-    }
-
-    public void setJobHistoryFilePath(Path jobHistoryFilePath) {
-      this.jobHistoryFilePath = jobHistoryFilePath;
-    }
-
-    public Path getJobConfFilePath() {
-      return jobConfFilePath;
-    }
-
-    public void setJobConfFilePath(Path jobConfFilePath) {
-      this.jobConfFilePath = jobConfFilePath;
-    }
-
-    @Override
-    public int hashCode() {
-      return jobId.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      JobFiles other = (JobFiles) obj;
-      return jobId.equals(other.jobId);
-    }
-  }
-
-  private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN }
-
-
-  @Override
-  protected void writeEntities(Configuration tlConf,
-      TimelineCollectorManager manager, Context context) throws IOException {
-    // collect the apps it needs to process
-    Configuration conf = context.getConfiguration();
-    int taskId = context.getTaskAttemptID().getTaskID().getId();
-    int size = conf.getInt(MRJobConfig.NUM_MAPS,
-        TimelineServicePerformanceV2.NUM_MAPS_DEFAULT);
-    String processingDir =
-        conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
-    int replayMode =
-        conf.getInt(JobHistoryFileReplayMapper.REPLAY_MODE,
-        JobHistoryFileReplayMapper.REPLAY_MODE_DEFAULT);
-    Path processingPath = new Path(processingDir);
-    FileSystem processingFs = processingPath.getFileSystem(conf);
-    JobHistoryFileParser parser = new JobHistoryFileParser(processingFs);
-    TimelineEntityConverter converter = new TimelineEntityConverter();
-
-    Collection<JobFiles> jobs =
-        selectJobFiles(processingFs, processingPath, taskId, size);
-    if (jobs.isEmpty()) {
-      LOG.info(context.getTaskAttemptID().getTaskID() +
-          " will process no jobs");
-    } else {
-      LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
-          jobs.size() + " jobs");
-    }
-    for (JobFiles job: jobs) {
-      // process each job
-      String jobIdStr = job.getJobId();
-      LOG.info("processing " + jobIdStr + "...");
-      JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
-      ApplicationId appId = jobId.getAppId();
-
-      // create the app level timeline collector and start it
-      AppLevelTimelineCollector collector =
-          new AppLevelTimelineCollector(appId);
-      manager.putIfAbsent(appId, collector);
-      try {
-        // parse the job info and configuration
-        JobInfo jobInfo =
-            parser.parseHistoryFile(job.getJobHistoryFilePath());
-        Configuration jobConf =
-            parser.parseConfiguration(job.getJobConfFilePath());
-        LOG.info("parsed the job history file and the configuration file for job"
-            + jobIdStr);
-
-        // set the context
-        // flow id: job name, flow run id: timestamp, user id
-        TimelineCollectorContext tlContext =
-            collector.getTimelineEntityContext();
-        tlContext.setFlowName(jobInfo.getJobname());
-        tlContext.setFlowRunId(jobInfo.getSubmitTime());
-        tlContext.setUserId(jobInfo.getUsername());
-
-        // create entities from job history and write them
-        long totalTime = 0;
-        List<TimelineEntity> entitySet =
-            converter.createTimelineEntities(jobInfo, jobConf);
-        LOG.info("converted them into timeline entities for job " + jobIdStr);
-        // use the current user for this purpose
-        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-        long startWrite = System.nanoTime();
-        try {
-          switch (replayMode) {
-          case JobHistoryFileReplayMapper.WRITE_ALL_AT_ONCE:
-            writeAllEntities(collector, entitySet, ugi);
-            break;
-          case JobHistoryFileReplayMapper.WRITE_PER_ENTITY:
-            writePerEntity(collector, entitySet, ugi);
-            break;
-          default:
-            break;
-          }
-        } catch (Exception e) {
-          context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
-              increment(1);
-          LOG.error("writing to the timeline service failed", e);
-        }
-        long endWrite = System.nanoTime();
-        totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
-        int numEntities = entitySet.size();
-        LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
-
-        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
-            increment(totalTime);
-        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
-            increment(numEntities);
-      } finally {
-        manager.remove(appId);
-        context.progress(); // move it along
-      }
-    }
-  }
-
-  private void writeAllEntities(AppLevelTimelineCollector collector,
-      List<TimelineEntity> entitySet, UserGroupInformation ugi)
-      throws IOException {
-    TimelineEntities entities = new TimelineEntities();
-    entities.setEntities(entitySet);
-    collector.putEntities(entities, ugi);
-  }
-
-  private void writePerEntity(AppLevelTimelineCollector collector,
-      List<TimelineEntity> entitySet, UserGroupInformation ugi)
-      throws IOException {
-    for (TimelineEntity entity : entitySet) {
-      TimelineEntities entities = new TimelineEntities();
-      entities.addEntity(entity);
-      collector.putEntities(entities, ugi);
-      LOG.info("wrote entity " + entity.getId());
-    }
-  }
-
-  private Collection<JobFiles> selectJobFiles(FileSystem fs,
-      Path processingRoot, int i, int size) throws IOException {
-    Map<String,JobFiles> jobs = new HashMap<>();
-    RemoteIterator<LocatedFileStatus> it = fs.listFiles(processingRoot, true);
-    while (it.hasNext()) {
-      LocatedFileStatus status = it.next();
-      Path path = status.getPath();
-      String fileName = path.getName();
-      Matcher m = JOB_ID_PARSER.matcher(fileName);
-      if (!m.matches()) {
-        continue;
-      }
-      String jobId = m.group(1);
-      int lastId = Integer.parseInt(m.group(2));
-      int mod = lastId % size;
-      if (mod != i) {
-        continue;
-      }
-      LOG.info("this mapper will process file " + fileName);
-      // it's mine
-      JobFiles jobFiles = jobs.get(jobId);
-      if (jobFiles == null) {
-        jobFiles = new JobFiles(jobId);
-        jobs.put(jobId, jobFiles);
-      }
-      setFilePath(fileName, path, jobFiles);
-    }
-    return jobs.values();
-  }
-
-  private void setFilePath(String fileName, Path path,
-      JobFiles jobFiles) {
-    // determine if we're dealing with a job history file or a job conf file
-    FileType type = getFileType(fileName);
-    switch (type) {
-    case JOB_HISTORY_FILE:
-      if (jobFiles.getJobHistoryFilePath() == null) {
-        jobFiles.setJobHistoryFilePath(path);
-      } else {
-        LOG.warn("we already have the job history file " +
-            jobFiles.getJobHistoryFilePath() + ": skipping " + path);
-      }
-      break;
-    case JOB_CONF_FILE:
-      if (jobFiles.getJobConfFilePath() == null) {
-        jobFiles.setJobConfFilePath(path);
-      } else {
-        LOG.warn("we already have the job conf file " +
-            jobFiles.getJobConfFilePath() + ": skipping " + path);
-      }
-      break;
-    case UNKNOWN:
-      LOG.warn("unknown type: " + path);
-    }
-  }
-
-  private FileType getFileType(String fileName) {
-    if (fileName.endsWith(".jhist")) {
-      return FileType.JOB_HISTORY_FILE;
-    }
-    if (fileName.endsWith("_conf.xml")) {
-      return FileType.JOB_CONF_FILE;
-    }
-    return FileType.UNKNOWN;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
deleted file mode 100644
index 625c32a..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
-import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
-
-/**
-   * Adds simple entities with random string payload, events, metrics, and
-   * configuration.
-   */
-class SimpleEntityWriter extends EntityWriter {
-  private static final Log LOG = LogFactory.getLog(SimpleEntityWriter.class);
-
-  // constants for mtype = 1
-  static final String KBS_SENT = "kbs sent";
-  static final int KBS_SENT_DEFAULT = 1;
-  static final String TEST_TIMES = "testtimes";
-  static final int TEST_TIMES_DEFAULT = 100;
-  static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
-      "timeline.server.performance.run.id";
-
-  protected void writeEntities(Configuration tlConf,
-      TimelineCollectorManager manager, Context context) throws IOException {
-    Configuration conf = context.getConfiguration();
-    // simulate the app id with the task id
-    int taskId = context.getTaskAttemptID().getTaskID().getId();
-    long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
-    ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
-
-    // create the app level timeline collector
-    AppLevelTimelineCollector collector =
-        new AppLevelTimelineCollector(appId);
-    manager.putIfAbsent(appId, collector);
-
-    try {
-      // set the context
-      // flow id: job name, flow run id: timestamp, user id
-      TimelineCollectorContext tlContext =
-          collector.getTimelineEntityContext();
-      tlContext.setFlowName(context.getJobName());
-      tlContext.setFlowRunId(timestamp);
-      tlContext.setUserId(context.getUser());
-
-      final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
-
-      long totalTime = 0;
-      final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
-      final Random rand = new Random();
-      final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
-      final char[] payLoad = new char[kbs * 1024];
-
-      for (int i = 0; i < testtimes; i++) {
-        // Generate a fixed length random payload
-        for (int xx = 0; xx < kbs * 1024; xx++) {
-          int alphaNumIdx =
-              rand.nextInt(TimelineServicePerformanceV2.alphaNums.length);
-          payLoad[xx] = TimelineServicePerformanceV2.alphaNums[alphaNumIdx];
-        }
-        String entId = taskAttemptId + "_" + Integer.toString(i);
-        final TimelineEntity entity = new TimelineEntity();
-        entity.setId(entId);
-        entity.setType("FOO_ATTEMPT");
-        entity.addInfo("PERF_TEST", payLoad);
-        // add an event
-        TimelineEvent event = new TimelineEvent();
-        event.setId("foo_event_id");
-        event.setTimestamp(System.currentTimeMillis());
-        event.addInfo("foo_event", "test");
-        entity.addEvent(event);
-        // add a metric
-        TimelineMetric metric = new TimelineMetric();
-        metric.setId("foo_metric");
-        metric.addValue(System.currentTimeMillis(), 123456789L);
-        entity.addMetric(metric);
-        // add a config
-        entity.addConfig("foo", "bar");
-
-        TimelineEntities entities = new TimelineEntities();
-        entities.addEntity(entity);
-        // use the current user for this purpose
-        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-        long startWrite = System.nanoTime();
-        try {
-          collector.putEntities(entities, ugi);
-        } catch (Exception e) {
-          context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
-              increment(1);
-          LOG.error("writing to the timeline service failed", e);
-        }
-        long endWrite = System.nanoTime();
-        totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
-      }
-      LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
-          " kB) in " + totalTime + " ms");
-      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
-          increment(totalTime);
-      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
-          increment(testtimes);
-      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
-          increment(kbs*testtimes);
-    } finally {
-      // clean up
-      manager.remove(appId);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
deleted file mode 100644
index 0e2eb72..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-class TimelineEntityConverter {
-  private static final Log LOG =
-      LogFactory.getLog(TimelineEntityConverter.class);
-
-  static final String JOB = "MAPREDUCE_JOB";
-  static final String TASK = "MAPREDUCE_TASK";
-  static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT";
-
-  /**
-   * Creates job, task, and task attempt entities based on the job history info
-   * and configuration.
-   *
-   * Note: currently these are plan timeline entities created for mapreduce
-   * types. These are not meant to be the complete and accurate entity set-up
-   * for mapreduce jobs. We do not leverage hierarchical timeline entities. If
-   * we create canonical mapreduce hierarchical timeline entities with proper
-   * parent-child relationship, we could modify this to use that instead.
-   *
-   * Note that we also do not add info to the YARN application entity, which
-   * would be needed for aggregation.
-   */
-  public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
-      Configuration conf) {
-    List<TimelineEntity> entities = new ArrayList<>();
-
-    // create the job entity
-    TimelineEntity job = createJobEntity(jobInfo, conf);
-    entities.add(job);
-
-    // create the task and task attempt entities
-    List<TimelineEntity> tasksAndAttempts =
-        createTaskAndTaskAttemptEntities(jobInfo);
-    entities.addAll(tasksAndAttempts);
-
-    return entities;
-  }
-
-  private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
-    TimelineEntity job = new TimelineEntity();
-    job.setType(JOB);
-    job.setId(jobInfo.getJobId().toString());
-    job.setCreatedTime(jobInfo.getSubmitTime());
-
-    job.addInfo("JOBNAME", jobInfo.getJobname());
-    job.addInfo("USERNAME", jobInfo.getUsername());
-    job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName());
-    job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime());
-    job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime());
-    job.addInfo("FINISH_TIME", jobInfo.getFinishTime());
-    job.addInfo("JOB_STATUS", jobInfo.getJobStatus());
-    job.addInfo("PRIORITY", jobInfo.getPriority());
-    job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps());
-    job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces());
-    job.addInfo("UBERIZED", jobInfo.getUberized());
-    job.addInfo("ERROR_INFO", jobInfo.getErrorInfo());
-
-    // add metrics from total counters
-    // we omit the map counters and reduce counters for now as it's kind of
-    // awkward to put them (map/reduce/total counters are really a group of
-    // related counters)
-    Counters totalCounters = jobInfo.getTotalCounters();
-    if (totalCounters != null) {
-      addMetrics(job, totalCounters);
-    }
-    // finally add configuration to the job
-    addConfiguration(job, conf);
-    LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity");
-    return job;
-  }
-
-  private void addConfiguration(TimelineEntity job, Configuration conf) {
-    for (Map.Entry<String,String> e: conf) {
-      job.addConfig(e.getKey(), e.getValue());
-    }
-  }
-
-  private void addMetrics(TimelineEntity entity, Counters counters) {
-    for (CounterGroup g: counters) {
-      String groupName = g.getName();
-      for (Counter c: g) {
-        String name = groupName + ":" + c.getName();
-        TimelineMetric metric = new TimelineMetric();
-        metric.setId(name);
-        metric.addValue(System.currentTimeMillis(), c.getValue());
-        entity.addMetric(metric);
-      }
-    }
-  }
-
-  private List<TimelineEntity> createTaskAndTaskAttemptEntities(
-      JobInfo jobInfo) {
-    List<TimelineEntity> entities = new ArrayList<>();
-    Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
-    LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
-        " tasks");
-    for (TaskInfo taskInfo: taskInfoMap.values()) {
-      TimelineEntity task = createTaskEntity(taskInfo);
-      entities.add(task);
-      // add the task attempts from this task
-      Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo);
-      entities.addAll(taskAttempts);
-    }
-    return entities;
-  }
-
-  private TimelineEntity createTaskEntity(TaskInfo taskInfo) {
-    TimelineEntity task = new TimelineEntity();
-    task.setType(TASK);
-    task.setId(taskInfo.getTaskId().toString());
-    task.setCreatedTime(taskInfo.getStartTime());
-
-    task.addInfo("START_TIME", taskInfo.getStartTime());
-    task.addInfo("FINISH_TIME", taskInfo.getFinishTime());
-    task.addInfo("TASK_TYPE", taskInfo.getTaskType());
-    task.addInfo("TASK_STATUS", taskInfo.getTaskStatus());
-    task.addInfo("ERROR_INFO", taskInfo.getError());
-
-    // add metrics from counters
-    Counters counters = taskInfo.getCounters();
-    if (counters != null) {
-      addMetrics(task, counters);
-    }
-    LOG.info("converted task " + taskInfo.getTaskId() +
-        " to a timeline entity");
-    return task;
-  }
-
-  private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
-    Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
-    Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
-        taskInfo.getAllTaskAttempts();
-    LOG.info("task " + taskInfo.getTaskId() + " has " +
-        taskAttemptInfoMap.size() + " task attempts");
-    for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) {
-      TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo);
-      taskAttempts.add(taskAttempt);
-    }
-    return taskAttempts;
-  }
-
-  private TimelineEntity createTaskAttemptEntity(
-      TaskAttemptInfo taskAttemptInfo) {
-    TimelineEntity taskAttempt = new TimelineEntity();
-    taskAttempt.setType(TASK_ATTEMPT);
-    taskAttempt.setId(taskAttemptInfo.getAttemptId().toString());
-    taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime());
-
-    taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime());
-    taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime());
-    taskAttempt.addInfo("MAP_FINISH_TIME",
-        taskAttemptInfo.getMapFinishTime());
-    taskAttempt.addInfo("SHUFFLE_FINISH_TIME",
-        taskAttemptInfo.getShuffleFinishTime());
-    taskAttempt.addInfo("SORT_FINISH_TIME",
-        taskAttemptInfo.getSortFinishTime());
-    taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus());
-    taskAttempt.addInfo("STATE", taskAttemptInfo.getState());
-    taskAttempt.addInfo("ERROR", taskAttemptInfo.getError());
-    taskAttempt.addInfo("CONTAINER_ID",
-        taskAttemptInfo.getContainerId().toString());
-
-    // add metrics from counters
-    Counters counters = taskAttemptInfo.getCounters();
-    if (counters != null) {
-      addMetrics(taskAttempt, counters);
-    }
-    LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() +
-        " to a timeline entity");
-    return taskAttempt;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
deleted file mode 100644
index f674ae1..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.Date;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
-
-public class TimelineServicePerformanceV2 extends Configured implements Tool {
-  static final int NUM_MAPS_DEFAULT = 1;
-
-  static final int SIMPLE_ENTITY_WRITER = 1;
-  static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2;
-  static int mapperType = SIMPLE_ENTITY_WRITER;
-
-  protected static int printUsage() {
-    System.err.println(
-        "Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
-            ")\n" +
-        "     [-mtype <mapper type in integer>]\n" +
-        "          1. simple entity write mapper\n" +
-        "          2. job history file replay mapper\n" +
-        "     [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
-             SimpleEntityWriter.KBS_SENT_DEFAULT + " KB)\n" +
-        "     [-t] package sending iterations per mapper (mtype=1, default: " +
-             SimpleEntityWriter.TEST_TIMES_DEFAULT + ")\n" +
-        "     [-d <path>] root path of job history files (mtype=2)\n" +
-        "     [-r <replay mode>] (mtype=2)\n" +
-        "          1. write all entities for a job in one put (default)\n" +
-        "          2. write one entity at a time\n");
-    GenericOptionsParser.printGenericCommandUsage(System.err);
-    return -1;
-  }
-
-  /**
-   * Configure a job given argv.
-   */
-  public static boolean parseArgs(String[] args, Job job) throws IOException {
-    // set the common defaults
-    Configuration conf = job.getConfiguration();
-    conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
-
-    for (int i = 0; i < args.length; i++) {
-      if (args.length == i + 1) {
-        System.out.println("ERROR: Required parameter missing from " + args[i]);
-        return printUsage() == 0;
-      }
-      try {
-        if ("-m".equals(args[i])) {
-          if (Integer.parseInt(args[++i]) > 0) {
-            job.getConfiguration()
-                .setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
-          }
-        } else if ("-mtype".equals(args[i])) {
-          mapperType = Integer.parseInt(args[++i]);
-        } else if ("-s".equals(args[i])) {
-          if (Integer.parseInt(args[++i]) > 0) {
-            conf.setInt(SimpleEntityWriter.KBS_SENT, Integer.parseInt(args[i]));
-          }
-        } else if ("-t".equals(args[i])) {
-          if (Integer.parseInt(args[++i]) > 0) {
-            conf.setInt(SimpleEntityWriter.TEST_TIMES,
-                Integer.parseInt(args[i]));
-          }
-        } else if ("-d".equals(args[i])) {
-          conf.set(JobHistoryFileReplayMapper.PROCESSING_PATH, args[++i]);
-        } else if ("-r".equals(args[i])) {
-          conf.setInt(JobHistoryFileReplayMapper.REPLAY_MODE,
-              Integer.parseInt(args[++i]));
-        } else {
-          System.out.println("Unexpected argument: " + args[i]);
-          return printUsage() == 0;
-        }
-      } catch (NumberFormatException except) {
-        System.out.println("ERROR: Integer expected instead of " + args[i]);
-        return printUsage() == 0;
-      } catch (Exception e) {
-        throw (IOException)new IOException().initCause(e);
-      }
-    }
-
-    // handle mapper-specific settings
-    switch (mapperType) {
-    case JOB_HISTORY_FILE_REPLAY_MAPPER:
-      job.setMapperClass(JobHistoryFileReplayMapper.class);
-      String processingPath =
-          conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
-      if (processingPath == null || processingPath.isEmpty()) {
-        System.out.println("processing path is missing while mtype = 2");
-        return printUsage() == 0;
-      }
-      break;
-    case SIMPLE_ENTITY_WRITER:
-    default:
-      job.setMapperClass(SimpleEntityWriter.class);
-      // use the current timestamp as the "run id" of the test: this will
-      // be used as simulating the cluster timestamp for apps
-      conf.setLong(SimpleEntityWriter.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
-          System.currentTimeMillis());
-      break;
-    }
-
-    return true;
-  }
-
-  /**
-   * TimelineServer Performance counters
-   */
-  static enum PerfCounters {
-    TIMELINE_SERVICE_WRITE_TIME,
-    TIMELINE_SERVICE_WRITE_COUNTER,
-    TIMELINE_SERVICE_WRITE_FAILURES,
-    TIMELINE_SERVICE_WRITE_KBS,
-  }
-
-  public int run(String[] args) throws Exception {
-
-    Job job = Job.getInstance(getConf());
-    job.setJarByClass(TimelineServicePerformanceV2.class);
-    job.setMapperClass(SimpleEntityWriter.class);
-    job.setInputFormatClass(SleepInputFormat.class);
-    job.setOutputFormatClass(NullOutputFormat.class);
-    job.setNumReduceTasks(0);
-    if (!parseArgs(args, job)) {
-      return -1;
-    }
-
-    Date startTime = new Date();
-    System.out.println("Job started: " + startTime);
-    int ret = job.waitForCompletion(true) ? 0 : 1;
-    org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
-    long writetime =
-        counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue();
-    long writecounts =
-        counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue();
-    long writesize =
-        counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
-    double transacrate = writecounts * 1000 / (double)writetime;
-    double iorate = writesize * 1000 / (double)writetime;
-    int numMaps =
-        Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS));
-
-    System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
-        " ops/s");
-    System.out.println("IO RATE (per mapper): " + iorate + " KB/s");
-
-    System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps +
-        " ops/s");
-    System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s");
-
-    return ret;
-  }
-
-  public static void main(String[] args) throws Exception {
-    int res =
-        ToolRunner.run(new Configuration(), new TimelineServicePerformanceV2(),
-            args);
-    System.exit(res);
-  }
-
-  /**
-   *  To ensure that the compression really gets exercised, generate a
-   *  random alphanumeric fixed length payload
-   */
-  static final char[] alphaNums = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
-    'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
-    's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
-    'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
-    'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
-    '3', '4', '5', '6', '7', '8', '9', '0', ' ' };
-
-  /**
-   * Base mapper for writing entities to the timeline service. Subclasses
-   * override {@link #writeEntities(Configuration, TimelineCollectorManager,
-   * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
-   * to the timeline service.
-   */
-  public static abstract class EntityWriter
-      extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
-    @Override
-    public void map(IntWritable key, IntWritable val, Context context)
-        throws IOException {
-
-      // create the timeline collector manager wired with the writer
-      Configuration tlConf = new YarnConfiguration();
-      TimelineCollectorManager manager = new TimelineCollectorManager("test");
-      manager.init(tlConf);
-      manager.start();
-      try {
-        // invoke the method to have the subclass write entities
-        writeEntities(tlConf, manager, context);
-      } finally {
-        manager.close();
-      }
-    }
-
-    protected abstract void writeEntities(Configuration tlConf,
-        TimelineCollectorManager manager, Context context) throws IOException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
new file mode 100644
index 0000000..f5d95c3
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Base mapper for writing entities to the timeline service. Subclasses
+ * override {@link #writeEntities(Configuration, TimelineCollectorManager,
+ * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
+ * to the timeline service.
+ */
+abstract class EntityWriterV2
+    extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
+  @Override
+  public void map(IntWritable key, IntWritable val, Context context)
+      throws IOException {
+
+    // create the timeline collector manager wired with the writer
+    Configuration tlConf = new YarnConfiguration();
+    TimelineCollectorManager manager = new TimelineCollectorManager("test");
+    manager.init(tlConf);
+    manager.start();
+    try {
+      // invoke the method to have the subclass write entities
+      writeEntities(tlConf, manager, context);
+    } finally {
+      manager.close();
+    }
+  }
+
+  protected abstract void writeEntities(Configuration tlConf,
+      TimelineCollectorManager manager, Context context) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
index 5e10662..447ea4e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java
@@ -20,33 +20,21 @@ package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
-import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper;
 import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
new file mode 100644
index 0000000..6a9a878
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Mapper for TimelineServicePerformance that replays job history files to the
+ * timeline service v.2.
+ *
+ */
+class JobHistoryFileReplayMapperV2 extends EntityWriterV2 {
+  private static final Log LOG =
+      LogFactory.getLog(JobHistoryFileReplayMapperV2.class);
+
+  @Override
+  protected void writeEntities(Configuration tlConf,
+      TimelineCollectorManager manager, Context context) throws IOException {
+    JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context);
+    int replayMode = helper.getReplayMode();
+    JobHistoryFileParser parser = helper.getParser();
+    TimelineEntityConverterV2 converter = new TimelineEntityConverterV2();
+
+    // collect the apps it needs to process
+    Collection<JobFiles> jobs = helper.getJobFiles();
+    if (jobs.isEmpty()) {
+      LOG.info(context.getTaskAttemptID().getTaskID() +
+          " will process no jobs");
+    } else {
+      LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
+          jobs.size() + " jobs");
+    }
+    for (JobFiles job: jobs) {
+      // process each job
+      String jobIdStr = job.getJobId();
+      // skip if either of the file is missing
+      if (job.getJobConfFilePath() == null ||
+          job.getJobHistoryFilePath() == null) {
+        LOG.info(jobIdStr + " missing either the job history file or the " +
+            "configuration file. Skipping.");
+        continue;
+      }
+      LOG.info("processing " + jobIdStr + "...");
+      JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
+      ApplicationId appId = jobId.getAppId();
+
+      // create the app level timeline collector and start it
+      AppLevelTimelineCollector collector =
+          new AppLevelTimelineCollector(appId);
+      manager.putIfAbsent(appId, collector);
+      try {
+        // parse the job info and configuration
+        JobInfo jobInfo =
+            parser.parseHistoryFile(job.getJobHistoryFilePath());
+        Configuration jobConf =
+            parser.parseConfiguration(job.getJobConfFilePath());
+        LOG.info("parsed the job history file and the configuration file for job"
+            + jobIdStr);
+
+        // set the context
+        // flow id: job name, flow run id: timestamp, user id
+        TimelineCollectorContext tlContext =
+            collector.getTimelineEntityContext();
+        tlContext.setFlowName(jobInfo.getJobname());
+        tlContext.setFlowRunId(jobInfo.getSubmitTime());
+        tlContext.setUserId(jobInfo.getUsername());
+
+        // create entities from job history and write them
+        long totalTime = 0;
+        List<TimelineEntity> entitySet =
+            converter.createTimelineEntities(jobInfo, jobConf);
+        LOG.info("converted them into timeline entities for job " + jobIdStr);
+        // use the current user for this purpose
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        long startWrite = System.nanoTime();
+        try {
+          switch (replayMode) {
+          case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE:
+            writeAllEntities(collector, entitySet, ugi);
+            break;
+          case JobHistoryFileReplayHelper.WRITE_PER_ENTITY:
+            writePerEntity(collector, entitySet, ugi);
+            break;
+          default:
+            break;
+          }
+        } catch (Exception e) {
+          context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+              increment(1);
+          LOG.error("writing to the timeline service failed", e);
+        }
+        long endWrite = System.nanoTime();
+        totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+        int numEntities = entitySet.size();
+        LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
+
+        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+            increment(totalTime);
+        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+            increment(numEntities);
+      } finally {
+        manager.remove(appId);
+        context.progress(); // move it along
+      }
+    }
+  }
+
+  private void writeAllEntities(AppLevelTimelineCollector collector,
+      List<TimelineEntity> entitySet, UserGroupInformation ugi)
+      throws IOException {
+    TimelineEntities entities = new TimelineEntities();
+    entities.setEntities(entitySet);
+    collector.putEntities(entities, ugi);
+  }
+
+  private void writePerEntity(AppLevelTimelineCollector collector,
+      List<TimelineEntity> entitySet, UserGroupInformation ugi)
+      throws IOException {
+    for (TimelineEntity entity : entitySet) {
+      TimelineEntities entities = new TimelineEntities();
+      entities.addEntity(entity);
+      collector.putEntities(entities, ugi);
+      LOG.info("wrote entity " + entity.getId());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
new file mode 100644
index 0000000..b89d0e8
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Constants for simple entity writers.
+ */
+interface SimpleEntityWriterConstants {
+  // constants for mtype = 1
+  String KBS_SENT = "kbs sent";
+  int KBS_SENT_DEFAULT = 1;
+  String TEST_TIMES = "testtimes";
+  int TEST_TIMES_DEFAULT = 100;
+  String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
+      "timeline.server.performance.run.id";
+
+  /**
+   *  To ensure that the compression really gets exercised, generate a
+   *  random alphanumeric fixed length payload
+   */
+  char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
+    'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
+    's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
+    'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
+    'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
+    '3', '4', '5', '6', '7', '8', '9', '0', ' ' };
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
index 2c851e9..b10ae04 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java
@@ -27,44 +27,22 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 /**
    * Adds simple entities with random string payload, events, metrics, and
    * configuration.
    */
-class SimpleEntityWriterV1 extends
-    org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
+class SimpleEntityWriterV1
+    extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable>
+    implements SimpleEntityWriterConstants {
   private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class);
 
-  // constants for mtype = 1
-  static final String KBS_SENT = "kbs sent";
-  static final int KBS_SENT_DEFAULT = 1;
-  static final String TEST_TIMES = "testtimes";
-  static final int TEST_TIMES_DEFAULT = 100;
-  static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
-      "timeline.server.performance.run.id";
-  /**
-   *  To ensure that the compression really gets exercised, generate a
-   *  random alphanumeric fixed length payload
-   */
-  private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
-    'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
-    's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
-    'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
-    'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
-    '3', '4', '5', '6', '7', '8', '9', '0', ' ' };
-
   public void map(IntWritable key, IntWritable val, Context context) throws IOException {
     TimelineClient tlc = new TimelineClientImpl();
     Configuration conf = context.getConfiguration();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
new file mode 100644
index 0000000..d66deb0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+
+/**
+ * Adds simple entities with random string payload, events, metrics, and
+ * configuration.
+ */
+class SimpleEntityWriterV2 extends EntityWriterV2
+    implements SimpleEntityWriterConstants {
+  private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV2.class);
+
+  protected void writeEntities(Configuration tlConf,
+      TimelineCollectorManager manager, Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    // simulate the app id with the task id
+    int taskId = context.getTaskAttemptID().getTaskID().getId();
+    long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
+    ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
+
+    // create the app level timeline collector
+    AppLevelTimelineCollector collector =
+        new AppLevelTimelineCollector(appId);
+    manager.putIfAbsent(appId, collector);
+
+    try {
+      // set the context
+      // flow id: job name, flow run id: timestamp, user id
+      TimelineCollectorContext tlContext =
+          collector.getTimelineEntityContext();
+      tlContext.setFlowName(context.getJobName());
+      tlContext.setFlowRunId(timestamp);
+      tlContext.setUserId(context.getUser());
+
+      final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
+
+      long totalTime = 0;
+      final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
+      final Random rand = new Random();
+      final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
+      final char[] payLoad = new char[kbs * 1024];
+
+      for (int i = 0; i < testtimes; i++) {
+        // Generate a fixed length random payload
+        for (int xx = 0; xx < kbs * 1024; xx++) {
+          int alphaNumIdx =
+              rand.nextInt(ALPHA_NUMS.length);
+          payLoad[xx] = ALPHA_NUMS[alphaNumIdx];
+        }
+        String entId = taskAttemptId + "_" + Integer.toString(i);
+        final TimelineEntity entity = new TimelineEntity();
+        entity.setId(entId);
+        entity.setType("FOO_ATTEMPT");
+        entity.addInfo("PERF_TEST", payLoad);
+        // add an event
+        TimelineEvent event = new TimelineEvent();
+        event.setId("foo_event_id");
+        event.setTimestamp(System.currentTimeMillis());
+        event.addInfo("foo_event", "test");
+        entity.addEvent(event);
+        // add a metric
+        TimelineMetric metric = new TimelineMetric();
+        metric.setId("foo_metric");
+        metric.addValue(System.currentTimeMillis(), 123456789L);
+        entity.addMetric(metric);
+        // add a config
+        entity.addConfig("foo", "bar");
+
+        TimelineEntities entities = new TimelineEntities();
+        entities.addEntity(entity);
+        // use the current user for this purpose
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        long startWrite = System.nanoTime();
+        try {
+          collector.putEntities(entities, ugi);
+        } catch (Exception e) {
+          context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+              increment(1);
+          LOG.error("writing to the timeline service failed", e);
+        }
+        long endWrite = System.nanoTime();
+        totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+      }
+      LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
+          " kB) in " + totalTime + " ms");
+      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+          increment(totalTime);
+      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+          increment(testtimes);
+      context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
+          increment(kbs*testtimes);
+    } finally {
+      // clean up
+      manager.remove(appId);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
index 79d123e..4d8b74b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java
@@ -25,11 +25,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6f4c513/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java
new file mode 100644
index 0000000..79633d2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+class TimelineEntityConverterV2 {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineEntityConverterV2.class);
+
+  static final String JOB = "MAPREDUCE_JOB";
+  static final String TASK = "MAPREDUCE_TASK";
+  static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT";
+
+  /**
+   * Creates job, task, and task attempt entities based on the job history info
+   * and configuration.
+   *
+   * Note: currently these are plan timeline entities created for mapreduce
+   * types. These are not meant to be the complete and accurate entity set-up
+   * for mapreduce jobs. We do not leverage hierarchical timeline entities. If
+   * we create canonical mapreduce hierarchical timeline entities with proper
+   * parent-child relationship, we could modify this to use that instead.
+   *
+   * Note that we also do not add info to the YARN application entity, which
+   * would be needed for aggregation.
+   */
+  public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
+      Configuration conf) {
+    List<TimelineEntity> entities = new ArrayList<>();
+
+    // create the job entity
+    TimelineEntity job = createJobEntity(jobInfo, conf);
+    entities.add(job);
+
+    // create the task and task attempt entities
+    List<TimelineEntity> tasksAndAttempts =
+        createTaskAndTaskAttemptEntities(jobInfo);
+    entities.addAll(tasksAndAttempts);
+
+    return entities;
+  }
+
+  private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
+    TimelineEntity job = new TimelineEntity();
+    job.setType(JOB);
+    job.setId(jobInfo.getJobId().toString());
+    job.setCreatedTime(jobInfo.getSubmitTime());
+
+    job.addInfo("JOBNAME", jobInfo.getJobname());
+    job.addInfo("USERNAME", jobInfo.getUsername());
+    job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName());
+    job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime());
+    job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime());
+    job.addInfo("FINISH_TIME", jobInfo.getFinishTime());
+    job.addInfo("JOB_STATUS", jobInfo.getJobStatus());
+    job.addInfo("PRIORITY", jobInfo.getPriority());
+    job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps());
+    job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces());
+    job.addInfo("UBERIZED", jobInfo.getUberized());
+    job.addInfo("ERROR_INFO", jobInfo.getErrorInfo());
+
+    // add metrics from total counters
+    // we omit the map counters and reduce counters for now as it's kind of
+    // awkward to put them (map/reduce/total counters are really a group of
+    // related counters)
+    Counters totalCounters = jobInfo.getTotalCounters();
+    if (totalCounters != null) {
+      addMetrics(job, totalCounters);
+    }
+    // finally add configuration to the job
+    addConfiguration(job, conf);
+    LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity");
+    return job;
+  }
+
+  private void addConfiguration(TimelineEntity job, Configuration conf) {
+    for (Map.Entry<String,String> e: conf) {
+      job.addConfig(e.getKey(), e.getValue());
+    }
+  }
+
+  private void addMetrics(TimelineEntity entity, Counters counters) {
+    for (CounterGroup g: counters) {
+      String groupName = g.getName();
+      for (Counter c: g) {
+        String name = groupName + ":" + c.getName();
+        TimelineMetric metric = new TimelineMetric();
+        metric.setId(name);
+        metric.addValue(System.currentTimeMillis(), c.getValue());
+        entity.addMetric(metric);
+      }
+    }
+  }
+
+  private List<TimelineEntity> createTaskAndTaskAttemptEntities(
+      JobInfo jobInfo) {
+    List<TimelineEntity> entities = new ArrayList<>();
+    Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
+    LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
+        " tasks");
+    for (TaskInfo taskInfo: taskInfoMap.values()) {
+      TimelineEntity task = createTaskEntity(taskInfo);
+      entities.add(task);
+      // add the task attempts from this task
+      Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo);
+      entities.addAll(taskAttempts);
+    }
+    return entities;
+  }
+
+  private TimelineEntity createTaskEntity(TaskInfo taskInfo) {
+    TimelineEntity task = new TimelineEntity();
+    task.setType(TASK);
+    task.setId(taskInfo.getTaskId().toString());
+    task.setCreatedTime(taskInfo.getStartTime());
+
+    task.addInfo("START_TIME", taskInfo.getStartTime());
+    task.addInfo("FINISH_TIME", taskInfo.getFinishTime());
+    task.addInfo("TASK_TYPE", taskInfo.getTaskType());
+    task.addInfo("TASK_STATUS", taskInfo.getTaskStatus());
+    task.addInfo("ERROR_INFO", taskInfo.getError());
+
+    // add metrics from counters
+    Counters counters = taskInfo.getCounters();
+    if (counters != null) {
+      addMetrics(task, counters);
+    }
+    LOG.info("converted task " + taskInfo.getTaskId() +
+        " to a timeline entity");
+    return task;
+  }
+
+  private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
+    Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
+    Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
+        taskInfo.getAllTaskAttempts();
+    LOG.info("task " + taskInfo.getTaskId() + " has " +
+        taskAttemptInfoMap.size() + " task attempts");
+    for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) {
+      TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo);
+      taskAttempts.add(taskAttempt);
+    }
+    return taskAttempts;
+  }
+
+  private TimelineEntity createTaskAttemptEntity(
+      TaskAttemptInfo taskAttemptInfo) {
+    TimelineEntity taskAttempt = new TimelineEntity();
+    taskAttempt.setType(TASK_ATTEMPT);
+    taskAttempt.setId(taskAttemptInfo.getAttemptId().toString());
+    taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime());
+
+    taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime());
+    taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime());
+    taskAttempt.addInfo("MAP_FINISH_TIME",
+        taskAttemptInfo.getMapFinishTime());
+    taskAttempt.addInfo("SHUFFLE_FINISH_TIME",
+        taskAttemptInfo.getShuffleFinishTime());
+    taskAttempt.addInfo("SORT_FINISH_TIME",
+        taskAttemptInfo.getSortFinishTime());
+    taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus());
+    taskAttempt.addInfo("STATE", taskAttemptInfo.getState());
+    taskAttempt.addInfo("ERROR", taskAttemptInfo.getError());
+    taskAttempt.addInfo("CONTAINER_ID",
+        taskAttemptInfo.getContainerId().toString());
+
+    // add metrics from counters
+    Counters counters = taskAttemptInfo.getCounters();
+    if (counters != null) {
+      addMetrics(taskAttempt, counters);
+    }
+    LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() +
+        " to a timeline entity");
+    return taskAttempt;
+  }
+}