You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/06 20:59:28 UTC

svn commit: r1637199 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: SparkTask.java status/SparkJobStatus.java status/impl/JobStateListener.java status/impl/SimpleSparkJobStatus.java

Author: xuefu
Date: Thu Nov  6 19:59:27 2014
New Revision: 1637199

URL: http://svn.apache.org/r1637199
Log:
HIVE-8726: Collect Spark TaskMetrics and build job statistic[Spark Branch] (Chengxiang via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1637199&r1=1637198&r2=1637199&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Thu Nov  6 19:59:27 2014
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.sp
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -104,13 +105,16 @@ public class SparkTask extends Task<Spar
       }
 
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
-      sparkCounters = jobRef.getSparkJobStatus().getCounter();
-      SparkJobMonitor monitor = new SparkJobMonitor(jobRef.getSparkJobStatus());
+      SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
+      sparkCounters = sparkJobStatus.getCounter();
+      SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
       monitor.startMonitor();
-      SparkStatistics sparkStatistics = jobRef.getSparkJobStatus().getSparkStatistics();
+      SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
       if (LOG.isInfoEnabled() && sparkStatistics != null) {
+        LOG.info(String.format("=====Spark Job[%d] statistics=====", jobRef.getJobId()));
         logSparkStatistic(sparkStatistics);
       }
+      sparkJobStatus.cleanup();
       rc = 0;
     } catch (Exception e) {
       LOG.error("Failed to execute spark task.", e);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1637199&r1=1637198&r2=1637199&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Thu Nov  6 19:59:27 2014
@@ -39,4 +39,5 @@ public interface SparkJobStatus {
 
   public SparkStatistics getSparkStatistics();
 
+  public void cleanup();
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java?rev=1637199&r1=1637198&r2=1637199&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java Thu Nov  6 19:59:27 2014
@@ -17,11 +17,16 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.spark.executor.TaskMetrics;
 import org.apache.spark.scheduler.JobSucceeded;
 import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerApplicationEnd;
@@ -39,12 +44,14 @@ import org.apache.spark.scheduler.SparkL
 import org.apache.spark.scheduler.SparkListenerTaskStart;
 import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
 
-import scala.collection.JavaConversions;
-
 public class JobStateListener implements SparkListener {
 
-  private Map<Integer, SparkJobState> jobIdToStates = new HashMap<Integer, SparkJobState>();
-  private Map<Integer, int[]> jobIdToStageId = new HashMap<Integer, int[]>();
+  private final static Log LOG = LogFactory.getLog(JobStateListener.class);
+
+  private final Map<Integer, SparkJobState> jobIdToStates = Maps.newHashMap();
+  private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+  private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+  private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
 
   @Override
   public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
@@ -67,19 +74,40 @@ public class JobStateListener implements
   }
 
   @Override
-  public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
-
+  public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+    int stageId = taskEnd.stageId();
+    int stageAttemptId = taskEnd.stageAttemptId();
+    String stageIdentifier = stageId + "_" + stageAttemptId;
+    Integer jobId = stageIdToJobId.get(stageId);
+    if (jobId == null) {
+      LOG.warn("Can not find job id for stage[" + stageId + "].");
+    } else {
+      Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
+      if (jobMetrics == null) {
+        jobMetrics = Maps.newHashMap();
+        allJobMetrics.put(jobId, jobMetrics);
+      }
+      List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+      if (stageMetrics == null) {
+        stageMetrics = Lists.newLinkedList();
+        jobMetrics.put(stageIdentifier, stageMetrics);
+      }
+      stageMetrics.add(taskEnd.taskMetrics());
+    }
   }
 
   @Override
   public synchronized void onJobStart(SparkListenerJobStart jobStart) {
-    jobIdToStates.put(jobStart.jobId(), SparkJobState.RUNNING);
-    List<Object> ids = JavaConversions.asJavaList(jobStart.stageIds());
-    int[] intStageIds = new int[ids.size()];
-    for(int i=0; i<ids.size(); i++) {
-      intStageIds[i] = (Integer)ids.get(i);
+    int jobId = jobStart.jobId();
+    jobIdToStates.put(jobId, SparkJobState.RUNNING);
+    int size = jobStart.stageIds().size();
+    int[] intStageIds = new int[size];
+    for(int i=0; i< size; i++) {
+      Integer stageId = (Integer) jobStart.stageIds().apply(i);
+      intStageIds[i] = stageId;
+      stageIdToJobId.put(stageId, jobId);
     }
-    jobIdToStageId.put(jobStart.jobId(), intStageIds);
+    jobIdToStageId.put(jobId, intStageIds);
   }
 
   @Override
@@ -134,4 +162,21 @@ public class JobStateListener implements
   public synchronized int[] getStageIds(int jobId) {
     return jobIdToStageId.get(jobId);
   }
+
+  public synchronized  Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
+    return allJobMetrics.get(jobId);
+  }
+
+  public synchronized void cleanup(int jobId) {
+    allJobMetrics.remove(jobId);
+    jobIdToStates.remove(jobId);
+    jobIdToStageId.remove(jobId);
+    Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<Integer, Integer> entry = iterator.next();
+      if (entry.getValue() == jobId) {
+        iterator.remove();
+      }
+    }
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1637199&r1=1637198&r2=1637199&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Thu Nov  6 19:59:27 2014
@@ -22,12 +22,17 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.executor.InputMetrics;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
 import org.apache.spark.scheduler.StageInfo;
 import org.apache.spark.ui.jobs.JobProgressListener;
 import org.apache.spark.ui.jobs.UIData;
@@ -124,7 +129,98 @@ public class SimpleSparkJobStatus implem
 
   @Override
   public SparkStatistics getSparkStatistics() {
-    return new SparkStatisticsBuilder().add(sparkCounters).build();
+    SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
+    // add Hive operator level statistics.
+    sparkStatisticsBuilder.add(sparkCounters);
+    // add spark job metrics.
+    String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
+    Map<String, List<TaskMetrics>> jobMetric = jobStateListener.getJobMetric(jobId);
+    Map<String, Long> flatJobMetric = combineJobLevelMetrics(jobMetric);
+    for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
+      sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
+    }
+
+    return  sparkStatisticsBuilder.build();
+  }
+
+  @Override
+  public void cleanup() {
+    jobStateListener.cleanup(jobId);
+  }
+
+  private Map<String, Long> combineJobLevelMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+    Map<String, Long> results = Maps.newLinkedHashMap();
+
+    long executorDeserializeTime = 0;
+    long executorRunTime = 0;
+    long resultSize = 0;
+    long jvmGCTime = 0;
+    long resultSerializationTime = 0;
+    long memoryBytesSpilled = 0;
+    long diskBytesSpilled = 0;
+    long bytesRead = 0;
+    long remoteBlocksFetched = 0;
+    long localBlocksFetched = 0;
+    long fetchWaitTime = 0;
+    long remoteBytesRead = 0;
+    long shuffleBytesWritten = 0;
+    long shuffleWriteTime = 0;
+    boolean inputMetricExist = false;
+    boolean shuffleReadMetricExist = false;
+    boolean shuffleWriteMetricExist = false;
+
+    for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+      for (TaskMetrics taskMetrics : stageMetric) {
+        executorDeserializeTime += taskMetrics.executorDeserializeTime();
+        executorRunTime += taskMetrics.executorRunTime();
+        resultSize += taskMetrics.resultSize();
+        jvmGCTime += taskMetrics.jvmGCTime();
+        resultSerializationTime += taskMetrics.resultSerializationTime();
+        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+        diskBytesSpilled += taskMetrics.diskBytesSpilled();
+        if (!taskMetrics.inputMetrics().isEmpty()) {
+          inputMetricExist = true;
+          bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+        }
+        Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+        if (!shuffleReadMetricsOption.isEmpty()) {
+          shuffleReadMetricExist = true;
+          remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+          localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+          fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+          remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+        }
+        Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+        if (!shuffleWriteMetricsOption.isEmpty()) {
+          shuffleWriteMetricExist = true;
+          shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+          shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+        }
+      }
+    }
+
+    results.put("EexcutorDeserializeTime", executorDeserializeTime);
+    results.put("ExecutorRunTime", executorRunTime);
+    results.put("ResultSize", resultSize);
+    results.put("JvmGCTime", jvmGCTime);
+    results.put("ResultSerializationTime", resultSerializationTime);
+    results.put("MemoryBytesSpilled", memoryBytesSpilled);
+    results.put("DiskBytesSpilled", diskBytesSpilled);
+    if (inputMetricExist) {
+      results.put("BytesRead", bytesRead);
+    }
+    if (shuffleReadMetricExist) {
+      results.put("RemoteBlocksFetched", remoteBlocksFetched);
+      results.put("LocalBlocksFetched", localBlocksFetched);
+      results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+      results.put("FetchWaitTime", fetchWaitTime);
+      results.put("RemoteBytesRead", remoteBytesRead);
+    }
+    if (shuffleWriteMetricExist) {
+      results.put("ShuffleBytesWritten", shuffleBytesWritten);
+      results.put("ShuffleWriteTime", shuffleWriteTime);
+    }
+    return results;
   }
 
   private List<StageInfo> getStageInfo(int stageId) {