You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/03/14 17:03:10 UTC

hive git commit: HIVE-18034: Improving logging with HoS executors spend lots of time in GC (Sahil Takiar, reviewed by Rui Li)

Repository: hive
Updated Branches:
  refs/heads/master 9cdc08580 -> 57a1ec211


HIVE-18034: Improving logging with HoS executors spend lots of time in GC (Sahil Takiar, reviewed by Rui Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57a1ec21
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57a1ec21
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57a1ec21

Branch: refs/heads/master
Commit: 57a1ec211039d5a5c0eb309adb991283b112520e
Parents: 9cdc085
Author: Sahil Takiar <ta...@gmail.com>
Authored: Wed Mar 14 09:49:02 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Wed Mar 14 09:49:02 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/SparkTask.java    | 73 +++++++++++++++++---
 .../spark/Statistic/SparkStatisticGroup.java    | 18 +++--
 .../exec/spark/Statistic/SparkStatistics.java   | 21 ++++--
 .../spark/Statistic/SparkStatisticsBuilder.java |  8 +--
 .../spark/Statistic/SparkStatisticsNames.java   | 43 ++++++++++++
 .../spark/status/impl/JobMetricsListener.java   | 24 +++----
 .../spark/status/impl/LocalSparkJobStatus.java  | 19 ++---
 .../spark/status/impl/RemoteSparkJobStatus.java | 21 +++---
 .../spark/status/impl/SparkMetricsUtils.java    | 48 +++++--------
 .../hadoop/hive/ql/history/HiveHistory.java     |  1 +
 .../hive/spark/client/MetricsCollection.java    |  4 +-
 .../apache/hive/spark/client/RemoteDriver.java  |  2 +-
 .../hive/spark/client/metrics/Metrics.java      | 14 ++--
 .../spark/client/TestMetricsCollection.java     | 13 ++--
 14 files changed, 212 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 76f6ecc..c240884 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -27,9 +27,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -77,7 +79,10 @@ public class SparkTask extends Task<SparkWork> {
   private static final LogHelper console = new LogHelper(LOG);
   private PerfLogger perfLogger;
   private static final long serialVersionUID = 1L;
+  // The id of the actual Spark job
   private transient int sparkJobID;
+  // The id of the JobHandle used to track the actual Spark job
+  private transient String sparkJobHandleId;
   private transient SparkStatistics sparkStatistics;
   private transient long submitTime;
   private transient long startTime;
@@ -111,36 +116,60 @@ public class SparkTask extends Task<SparkWork> {
       SparkWork sparkWork = getWork();
       sparkWork.setRequiredCounterPrefix(getOperatorCounters());
 
+      // Submit the Spark job
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
       submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
       jobRef = sparkSession.submit(driverContext, sparkWork);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
 
+      // If the driver context has been shutdown (due to query cancellation) kill the Spark job
       if (driverContext.isShutdown()) {
         LOG.warn("Killing Spark job");
         killJob();
         throw new HiveException("Operation is cancelled.");
       }
 
-      addToHistory(jobRef);
-      this.jobID = jobRef.getSparkJobStatus().getAppID();
+      // Get the Job Handle id associated with the Spark job
+      sparkJobHandleId = jobRef.getJobId();
+
+      // Add Spark job handle id to the Hive History
+      addToHistory(Keys.SPARK_JOB_HANDLE_ID, jobRef.getJobId());
+
+      LOG.debug("Starting Spark job with job handle id " + sparkJobHandleId);
+
+      // Get the application id of the Spark app
+      jobID = jobRef.getSparkJobStatus().getAppID();
+
+      // Start monitoring the Spark job, returns when the Spark job has completed / failed, or if
+      // a timeout occurs
       rc = jobRef.monitorJob();
+
+      // Get the id the Spark job that was launched, returns -1 if no Spark job was launched
+      sparkJobID = jobRef.getSparkJobStatus().getJobId();
+
+      // Add Spark job id to the Hive History
+      addToHistory(Keys.SPARK_JOB_ID, Integer.toString(sparkJobID));
+
+      // Get the final state of the Spark job and parses its job info
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
-      sparkJobID = sparkJobStatus.getJobId();
       getSparkJobInfo(sparkJobStatus, rc);
+
       if (rc == 0) {
         sparkStatistics = sparkJobStatus.getSparkStatistics();
+        printExcessiveGCWarning();
         if (LOG.isInfoEnabled() && sparkStatistics != null) {
           LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID));
           logSparkStatistic(sparkStatistics);
         }
-        LOG.info("Successfully completed Spark Job " + sparkJobID + " with application ID " +
+        LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " +
                 jobID + " and task ID " + getId());
       } else if (rc == 2) { // Cancel job if the monitor found job submission timeout.
         // TODO: If the timeout is because of lack of resources in the cluster, we should
         // ideally also cancel the app request here. But w/o facilities from Spark or YARN,
         // it's difficult to do it on hive side alone. See HIVE-12650.
-        LOG.info("Failed to submit Spark job " + sparkJobID);
+        LOG.debug("Failed to submit Spark job with job handle id " + sparkJobHandleId);
+        LOG.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(jobID)
+                ? "UNKNOWN" : jobID));
         killJob();
       } else if (rc == 4) {
         LOG.info("The spark job or one stage of it has too many tasks" +
@@ -189,12 +218,35 @@ public class SparkTask extends Task<SparkWork> {
     return rc;
   }
 
-  private void addToHistory(SparkJobRef jobRef) {
-    console.printInfo("Starting Spark Job = " + jobRef.getJobId());
+  /**
+   * Use the Spark metrics and calculate how much task executione time was spent performing GC
+   * operations. If more than a defined threshold of time is spent, print out a warning on the
+   * console.
+   */
+  private void printExcessiveGCWarning() {
+    SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup(
+            SparkStatisticsNames.SPARK_GROUP_NAME);
+    if (sparkStatisticGroup != null) {
+      long taskDurationTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic(
+              SparkStatisticsNames.TASK_DURATION_TIME).getValue());
+      long jvmGCTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic(
+              SparkStatisticsNames.JVM_GC_TIME).getValue());
+
+      // Threshold percentage to trigger the GC warning
+      double threshold = 0.1;
+
+      if (jvmGCTime > taskDurationTime * threshold) {
+        long percentGcTime = Math.round((double) jvmGCTime / taskDurationTime * 100);
+        String gcWarning = String.format("WARNING: Spark Job[%s] Spent %s%% (%s ms / %s ms) of " +
+                "task time in GC", sparkJobID, percentGcTime, jvmGCTime, taskDurationTime);
+        console.printInfo(gcWarning);
+      }
+    }
+  }
+
+  private void addToHistory(Keys key, String value) {
     if (SessionState.get() != null) {
-      SessionState.get().getHiveHistory()
-              .setQueryProperty(queryState.getQueryId(), Keys.SPARK_JOB_ID,
-                      Integer.toString(jobRef.getSparkJobStatus().getJobId()));
+      SessionState.get().getHiveHistory().setQueryProperty(queryState.getQueryId(), key, value);
     }
   }
 
@@ -327,6 +379,7 @@ public class SparkTask extends Task<SparkWork> {
   }
 
   private void killJob() {
+    LOG.debug("Killing Spark job with job handle id " + sparkJobHandleId);
     boolean needToKillJob = false;
     if (jobRef != null && !jobKilled) {
       synchronized (this) {

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
index 5ab4d16..e1006e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
@@ -17,17 +17,20 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.Statistic;
 
-import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 public class SparkStatisticGroup {
   private final String groupName;
-  private final List<SparkStatistic> statisticList;
+  private final Map<String, SparkStatistic> statistics = new LinkedHashMap<>();
 
   SparkStatisticGroup(String groupName, List<SparkStatistic> statisticList) {
     this.groupName = groupName;
-    this.statisticList = Collections.unmodifiableList(statisticList);
+    for (SparkStatistic sparkStatistic : statisticList) {
+      this.statistics.put(sparkStatistic.getName(), sparkStatistic);
+    }
   }
 
   public String getGroupName() {
@@ -35,6 +38,13 @@ public class SparkStatisticGroup {
   }
 
   public Iterator<SparkStatistic> getStatistics() {
-    return this.statisticList.iterator();
+    return this.statistics.values().iterator();
+  }
+
+  /**
+   * Get a {@link SparkStatistic} by its given name
+   */
+  public SparkStatistic getSparkStatistic(String name) {
+    return this.statistics.get(name);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
index 584e8bf..946cadc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
@@ -17,19 +17,26 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.Statistic;
 
-
-import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 public class SparkStatistics {
-  private final List<SparkStatisticGroup> statisticGroups;
 
-  SparkStatistics(List<SparkStatisticGroup> statisticGroups) {
-    this.statisticGroups = Collections.unmodifiableList(statisticGroups);
+  private final Map<String, SparkStatisticGroup> statisticGroups = new LinkedHashMap<>();
+
+  SparkStatistics(List<SparkStatisticGroup> statisticGroupsList) {
+    for (SparkStatisticGroup group : statisticGroupsList) {
+      statisticGroups.put(group.getGroupName(), group);
+    }
   }
 
   public Iterator<SparkStatisticGroup> getStatisticGroups() {
-    return this.statisticGroups.iterator();
+    return this.statisticGroups.values().iterator();
+  }
+
+  public SparkStatisticGroup getStatisticGroup(String groupName) {
+    return this.statisticGroups.get(groupName);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
index 6ebc274..d31d60a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
@@ -21,7 +21,7 @@ import org.apache.hive.spark.counter.SparkCounter;
 import org.apache.hive.spark.counter.SparkCounterGroup;
 import org.apache.hive.spark.counter.SparkCounters;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -31,15 +31,15 @@ public class SparkStatisticsBuilder {
   private Map<String, List<SparkStatistic>> statisticMap;
 
   public SparkStatisticsBuilder() {
-    statisticMap = new HashMap<String, List<SparkStatistic>>();
+    statisticMap = new LinkedHashMap<>();
   }
 
   public SparkStatistics build() {
     List<SparkStatisticGroup> statisticGroups = new LinkedList<SparkStatisticGroup>();
     for (Map.Entry<String, List<SparkStatistic>> entry : statisticMap.entrySet()) {
       String groupName = entry.getKey();
-      List<SparkStatistic> statisitcList = entry.getValue();
-      statisticGroups.add(new SparkStatisticGroup(groupName, statisitcList));
+      List<SparkStatistic> statisticList = entry.getValue();
+      statisticGroups.add(new SparkStatisticGroup(groupName, statisticList));
     }
 
     return new SparkStatistics(statisticGroups);

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
new file mode 100644
index 0000000..ca93a80
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.Statistic;
+
+/**
+ * A collection of names that define different {@link SparkStatistic} objects.
+ */
+public class SparkStatisticsNames {
+
+  public static final String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime";
+  public static final String EXECUTOR_RUN_TIME = "ExecutorRunTime";
+  public static final String RESULT_SIZE = "ResultSize";
+  public static final String JVM_GC_TIME = "JvmGCTime";
+  public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime";
+  public static final String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled";
+  public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled";
+  public static final String BYTES_READ = "BytesRead";
+  public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched";
+  public static final String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched";
+  public static final String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched";
+  public static final String FETCH_WAIT_TIME = "FetchWaitTime";
+  public static final String REMOTE_BYTES_READ = "RemoteBytesRead";
+  public static final String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten";
+  public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime";
+  public static final String TASK_DURATION_TIME = "TaskDurationTime";
+
+  public static final String SPARK_GROUP_NAME = "SPARK";
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
index eaeb4dc..773fe97 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
+import java.util.AbstractMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +29,7 @@ import org.apache.spark.executor.TaskMetrics;
 import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerJobStart;
 import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.TaskInfo;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -38,7 +40,8 @@ public class JobMetricsListener extends SparkListener {
 
   private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
   private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
-  private final Map<Integer, Map<Integer, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
+  private final Map<Integer, Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>>> allJobMetrics =
+          Maps.newHashMap();
 
   @Override
   public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
@@ -47,17 +50,12 @@ public class JobMetricsListener extends SparkListener {
     if (jobId == null) {
       LOG.warn("Can not find job id for stage[" + stageId + "].");
     } else {
-      Map<Integer, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
-      if (jobMetrics == null) {
-        jobMetrics = Maps.newHashMap();
-        allJobMetrics.put(jobId, jobMetrics);
-      }
-      List<TaskMetrics> stageMetrics = jobMetrics.get(stageId);
-      if (stageMetrics == null) {
-        stageMetrics = Lists.newLinkedList();
-        jobMetrics.put(stageId, stageMetrics);
-      }
-      stageMetrics.add(taskEnd.taskMetrics());
+      Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> jobMetrics = allJobMetrics.computeIfAbsent(
+          jobId, k -> Maps.newHashMap());
+      List<Map.Entry<TaskMetrics, TaskInfo>> stageMetrics = jobMetrics.computeIfAbsent(stageId,
+          k -> Lists.newLinkedList());
+
+      stageMetrics.add(new AbstractMap.SimpleEntry<>(taskEnd.taskMetrics(), taskEnd.taskInfo()));
     }
   }
 
@@ -74,7 +72,7 @@ public class JobMetricsListener extends SparkListener {
     jobIdToStageId.put(jobId, intStageIds);
   }
 
-  public synchronized  Map<Integer, List<TaskMetrics>> getJobMetric(int jobId) {
+  public synchronized  Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> getJobMetric(int jobId) {
     return allJobMetrics.get(jobId);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index 8b031e7..03f8a0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -17,28 +17,31 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
-import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
 import org.apache.hive.spark.client.MetricsCollection;
 import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.counter.SparkCounters;
+
 import org.apache.spark.JobExecutionStatus;
 import org.apache.spark.SparkJobInfo;
 import org.apache.spark.SparkStageInfo;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.TaskInfo;
 
 public class LocalSparkJobStatus implements SparkJobStatus {
 
@@ -129,8 +132,7 @@ public class LocalSparkJobStatus implements SparkJobStatus {
     // add Hive operator level statistics.
     sparkStatisticsBuilder.add(sparkCounters);
     // add spark job metrics.
-    String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
-    Map<Integer, List<TaskMetrics>> jobMetric = jobMetricsListener.getJobMetric(jobId);
+    Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> jobMetric = jobMetricsListener.getJobMetric(jobId);
     if (jobMetric == null) {
       return null;
     }
@@ -138,16 +140,17 @@ public class LocalSparkJobStatus implements SparkJobStatus {
     MetricsCollection metricsCollection = new MetricsCollection();
     Set<Integer> stageIds = jobMetric.keySet();
     for (int stageId : stageIds) {
-      List<TaskMetrics> taskMetrics = jobMetric.get(stageId);
-      for (TaskMetrics taskMetric : taskMetrics) {
-        Metrics metrics = new Metrics(taskMetric);
+      List<Map.Entry<TaskMetrics, TaskInfo>> taskMetrics = jobMetric.get(stageId);
+      for (Map.Entry<TaskMetrics, TaskInfo> taskMetric : taskMetrics) {
+        Metrics metrics = new Metrics(taskMetric.getKey(), taskMetric.getValue());
         metricsCollection.addMetrics(jobId, stageId, 0, metrics);
       }
     }
     Map<String, Long> flatJobMetric = SparkMetricsUtils.collectMetrics(metricsCollection
         .getAllMetrics());
     for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
-      sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
+      sparkStatisticsBuilder.add(SparkStatisticsNames.SPARK_GROUP_NAME, entry.getKey(),
+              Long.toString(entry.getValue()));
     }
 
     return  sparkStatisticsBuilder.build();

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index ec7ca40..ff969e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -19,28 +19,30 @@
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
 import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hive.spark.client.MetricsCollection;
-import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.hive.spark.client.MetricsCollection;
 import org.apache.hive.spark.client.Job;
 import org.apache.hive.spark.client.JobContext;
 import org.apache.hive.spark.client.JobHandle;
 import org.apache.hive.spark.client.SparkClient;
+import org.apache.hive.spark.counter.SparkCounters;
+
 import org.apache.spark.JobExecutionStatus;
 import org.apache.spark.SparkJobInfo;
 import org.apache.spark.SparkStageInfo;
 import org.apache.spark.api.java.JavaFutureAction;
 
 import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -125,16 +127,19 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
     if (metricsCollection == null || getCounter() == null) {
       return null;
     }
+
     SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
-    // add Hive operator level statistics.
+
+    // add Hive operator level statistics. - e.g. RECORDS_IN, RECORDS_OUT
     sparkStatisticsBuilder.add(getCounter());
-    // add spark job metrics.
-    String jobIdentifier = "Spark Job[" + getJobId() + "] Metrics";
 
+    // add spark job metrics. - e.g. metrics collected by Spark itself (JvmGCTime,
+    // ExecutorRunTime, etc.)
     Map<String, Long> flatJobMetric = SparkMetricsUtils.collectMetrics(
         metricsCollection.getAllMetrics());
     for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
-      sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
+      sparkStatisticsBuilder.add(SparkStatisticsNames.SPARK_GROUP_NAME, entry.getKey(),
+              Long.toString(entry.getValue()));
     }
 
     return sparkStatisticsBuilder.build();

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
index dd17168..f72407e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
@@ -20,54 +20,40 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
 import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
 
 final class SparkMetricsUtils {
 
-  private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime";
-  private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime";
-  private final static String RESULT_SIZE = "ResultSize";
-  private final static String JVM_GC_TIME = "JvmGCTime";
-  private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime";
-  private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled";
-  private final static String DISK_BYTES_SPILLED = "DiskBytesSpilled";
-  private final static String BYTES_READ = "BytesRead";
-  private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched";
-  private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched";
-  private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched";
-  private final static String FETCH_WAIT_TIME = "FetchWaitTime";
-  private final static String REMOTE_BYTES_READ = "RemoteBytesRead";
-  private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten";
-  private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime";
-
   private SparkMetricsUtils(){}
 
   static Map<String, Long> collectMetrics(Metrics allMetrics) {
     Map<String, Long> results = new LinkedHashMap<String, Long>();
-    results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime);
-    results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime);
-    results.put(RESULT_SIZE, allMetrics.resultSize);
-    results.put(JVM_GC_TIME, allMetrics.jvmGCTime);
-    results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime);
-    results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled);
-    results.put(DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled);
+    results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime);
+    results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime);
+    results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize);
+    results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime);
+    results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime);
+    results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled);
+    results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled);
+    results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime);
     if (allMetrics.inputMetrics != null) {
-      results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead);
+      results.put(SparkStatisticsNames.BYTES_READ, allMetrics.inputMetrics.bytesRead);
     }
     if (allMetrics.shuffleReadMetrics != null) {
       ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics;
       long rbf = shuffleReadMetrics.remoteBlocksFetched;
       long lbf = shuffleReadMetrics.localBlocksFetched;
-      results.put(REMOTE_BLOCKS_FETCHED, rbf);
-      results.put(LOCAL_BLOCKS_FETCHED, lbf);
-      results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf);
-      results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime);
-      results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead);
+      results.put(SparkStatisticsNames.REMOTE_BLOCKS_FETCHED, rbf);
+      results.put(SparkStatisticsNames.LOCAL_BLOCKS_FETCHED, lbf);
+      results.put(SparkStatisticsNames.TOTAL_BLOCKS_FETCHED, rbf + lbf);
+      results.put(SparkStatisticsNames.FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime);
+      results.put(SparkStatisticsNames.REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead);
     }
     if (allMetrics.shuffleWriteMetrics != null) {
-      results.put(SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
-      results.put(SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime);
+      results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
+      results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime);
     }
     return results;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
index a5bafbc..327628f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
@@ -69,6 +69,7 @@ public interface HiveHistory {
     TASK_NUM_MAPPERS,
     TASK_NUM_REDUCERS,
     ROWS_INSERTED,
+    SPARK_JOB_HANDLE_ID,
     SPARK_JOB_ID
   };
 

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index 0f03a64..526aefd 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -25,7 +25,6 @@ import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
-import org.apache.hive.spark.client.metrics.DataReadMethod;
 import org.apache.hive.spark.client.metrics.InputMetrics;
 import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
@@ -148,6 +147,7 @@ public class MetricsCollection {
       long resultSerializationTime = 0L;
       long memoryBytesSpilled = 0L;
       long diskBytesSpilled = 0L;
+      long taskDurationTime = 0L;
 
       // Input metrics.
       boolean hasInputMetrics = false;
@@ -173,6 +173,7 @@ public class MetricsCollection {
         resultSerializationTime += m.resultSerializationTime;
         memoryBytesSpilled += m.memoryBytesSpilled;
         diskBytesSpilled += m.diskBytesSpilled;
+        taskDurationTime += m.taskDurationTime;
 
         if (m.inputMetrics != null) {
           hasInputMetrics = true;
@@ -222,6 +223,7 @@ public class MetricsCollection {
         resultSerializationTime,
         memoryBytesSpilled,
         diskBytesSpilled,
+        taskDurationTime,
         inputMetrics,
         shuffleReadMetrics,
         shuffleWriteMetrics);

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index e584cbb..f221d0a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -478,7 +478,7 @@ public class RemoteDriver {
     public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
       if (taskEnd.reason() instanceof org.apache.spark.Success$
           && !taskEnd.taskInfo().speculative()) {
-        Metrics metrics = new Metrics(taskEnd.taskMetrics());
+        Metrics metrics = new Metrics(taskEnd.taskMetrics(), taskEnd.taskInfo());
         Integer jobId;
         synchronized (stageToJobId) {
           jobId = stageToJobId.get(taskEnd.stageId());

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index 418d534..9da0116 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -19,10 +19,11 @@ package org.apache.hive.spark.client.metrics;
 
 import java.io.Serializable;
 
-import org.apache.spark.executor.TaskMetrics;
-
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.TaskInfo;
+
 /**
  * Metrics tracked during the execution of a job.
  *
@@ -46,6 +47,8 @@ public class Metrics implements Serializable {
   public final long memoryBytesSpilled;
   /** The number of on-disk bytes spilled by tasks. */
   public final long diskBytesSpilled;
+  /** Amount of time spent executing tasks. */
+  public final long taskDurationTime;
   /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */
   public final InputMetrics inputMetrics;
   /**
@@ -58,7 +61,7 @@ public class Metrics implements Serializable {
 
   private Metrics() {
     // For Serialization only.
-    this(0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
+    this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
   }
 
   public Metrics(
@@ -69,6 +72,7 @@ public class Metrics implements Serializable {
       long resultSerializationTime,
       long memoryBytesSpilled,
       long diskBytesSpilled,
+      long taskDurationTime,
       InputMetrics inputMetrics,
       ShuffleReadMetrics shuffleReadMetrics,
       ShuffleWriteMetrics shuffleWriteMetrics) {
@@ -79,12 +83,13 @@ public class Metrics implements Serializable {
     this.resultSerializationTime = resultSerializationTime;
     this.memoryBytesSpilled = memoryBytesSpilled;
     this.diskBytesSpilled = diskBytesSpilled;
+    this.taskDurationTime = taskDurationTime;
     this.inputMetrics = inputMetrics;
     this.shuffleReadMetrics = shuffleReadMetrics;
     this.shuffleWriteMetrics = shuffleWriteMetrics;
   }
 
-  public Metrics(TaskMetrics metrics) {
+  public Metrics(TaskMetrics metrics, TaskInfo taskInfo) {
     this(
       metrics.executorDeserializeTime(),
       metrics.executorRunTime(),
@@ -93,6 +98,7 @@ public class Metrics implements Serializable {
       metrics.resultSerializationTime(),
       metrics.memoryBytesSpilled(),
       metrics.diskBytesSpilled(),
+      taskInfo.duration(),
       optionalInputMetric(metrics),
       optionalShuffleReadMetric(metrics),
       optionalShuffleWriteMetrics(metrics));

http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 8fef66b..87b460d 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -66,7 +66,7 @@ public class TestMetricsCollection {
   @Test
   public void testOptionalMetrics() {
     long value = taskValue(1, 1, 1L);
-    Metrics metrics = new Metrics(value, value, value, value, value, value, value,
+    Metrics metrics = new Metrics(value, value, value, value, value, value, value, value,
         null, null, null);
 
     MetricsCollection collection = new MetricsCollection();
@@ -94,10 +94,10 @@ public class TestMetricsCollection {
     MetricsCollection collection = new MetricsCollection();
 
     long value = taskValue(1, 1, 1);
-    Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
-      new InputMetrics(value), null, null);
-    Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
-      new InputMetrics(value), null, null);
+    Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value,
+            new InputMetrics(value), null, null);
+    Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value,
+            new InputMetrics(value), null, null);
 
     collection.addMetrics(1, 1, 1, metrics1);
     collection.addMetrics(1, 1, 2, metrics2);
@@ -108,7 +108,7 @@ public class TestMetricsCollection {
 
   private Metrics makeMetrics(int jobId, int stageId, long taskId) {
     long value = 1000000 * jobId + 1000 * stageId + taskId;
-    return new Metrics(value, value, value, value, value, value, value,
+    return new Metrics(value, value, value, value, value, value, value, value,
       new InputMetrics(value),
       new ShuffleReadMetrics((int) value, (int) value, value, value),
       new ShuffleWriteMetrics(value, value));
@@ -154,6 +154,7 @@ public class TestMetricsCollection {
     assertEquals(expected, metrics.resultSerializationTime);
     assertEquals(expected, metrics.memoryBytesSpilled);
     assertEquals(expected, metrics.diskBytesSpilled);
+    assertEquals(expected, metrics.taskDurationTime);
 
     assertEquals(expected, metrics.inputMetrics.bytesRead);