You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/06 20:59:28 UTC
svn commit: r1637199 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
SparkTask.java status/SparkJobStatus.java status/impl/JobStateListener.java
status/impl/SimpleSparkJobStatus.java
Author: xuefu
Date: Thu Nov 6 19:59:27 2014
New Revision: 1637199
URL: http://svn.apache.org/r1637199
Log:
HIVE-8726: Collect Spark TaskMetrics and build job statistic[Spark Branch] (Chengxiang via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1637199&r1=1637198&r2=1637199&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Thu Nov 6 19:59:27 2014
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.sp
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -104,13 +105,16 @@ public class SparkTask extends Task<Spar
}
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
- sparkCounters = jobRef.getSparkJobStatus().getCounter();
- SparkJobMonitor monitor = new SparkJobMonitor(jobRef.getSparkJobStatus());
+ SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
+ sparkCounters = sparkJobStatus.getCounter();
+ SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
monitor.startMonitor();
- SparkStatistics sparkStatistics = jobRef.getSparkJobStatus().getSparkStatistics();
+ SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
+ LOG.info(String.format("=====Spark Job[%d] statistics=====", jobRef.getJobId()));
logSparkStatistic(sparkStatistics);
}
+ sparkJobStatus.cleanup();
rc = 0;
} catch (Exception e) {
LOG.error("Failed to execute spark task.", e);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1637199&r1=1637198&r2=1637199&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Thu Nov 6 19:59:27 2014
@@ -39,4 +39,5 @@ public interface SparkJobStatus {
public SparkStatistics getSparkStatistics();
+ public void cleanup();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java?rev=1637199&r1=1637198&r2=1637199&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java Thu Nov 6 19:59:27 2014
@@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.status.impl;
-import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.scheduler.JobSucceeded;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
@@ -39,12 +44,14 @@ import org.apache.spark.scheduler.SparkL
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
-import scala.collection.JavaConversions;
-
public class JobStateListener implements SparkListener {
- private Map<Integer, SparkJobState> jobIdToStates = new HashMap<Integer, SparkJobState>();
- private Map<Integer, int[]> jobIdToStageId = new HashMap<Integer, int[]>();
+ private final static Log LOG = LogFactory.getLog(JobStateListener.class);
+
+ private final Map<Integer, SparkJobState> jobIdToStates = Maps.newHashMap();
+ private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+ private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+ private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
@@ -67,19 +74,40 @@ public class JobStateListener implements
}
@Override
- public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
-
+ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ int stageId = taskEnd.stageId();
+ int stageAttemptId = taskEnd.stageAttemptId();
+ String stageIdentifier = stageId + "_" + stageAttemptId;
+ Integer jobId = stageIdToJobId.get(stageId);
+ if (jobId == null) {
+ LOG.warn("Can not find job id for stage[" + stageId + "].");
+ } else {
+ Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
+ if (jobMetrics == null) {
+ jobMetrics = Maps.newHashMap();
+ allJobMetrics.put(jobId, jobMetrics);
+ }
+ List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+ if (stageMetrics == null) {
+ stageMetrics = Lists.newLinkedList();
+ jobMetrics.put(stageIdentifier, stageMetrics);
+ }
+ stageMetrics.add(taskEnd.taskMetrics());
+ }
}
@Override
public synchronized void onJobStart(SparkListenerJobStart jobStart) {
- jobIdToStates.put(jobStart.jobId(), SparkJobState.RUNNING);
- List<Object> ids = JavaConversions.asJavaList(jobStart.stageIds());
- int[] intStageIds = new int[ids.size()];
- for(int i=0; i<ids.size(); i++) {
- intStageIds[i] = (Integer)ids.get(i);
+ int jobId = jobStart.jobId();
+ jobIdToStates.put(jobId, SparkJobState.RUNNING);
+ int size = jobStart.stageIds().size();
+ int[] intStageIds = new int[size];
+ for(int i=0; i< size; i++) {
+ Integer stageId = (Integer) jobStart.stageIds().apply(i);
+ intStageIds[i] = stageId;
+ stageIdToJobId.put(stageId, jobId);
}
- jobIdToStageId.put(jobStart.jobId(), intStageIds);
+ jobIdToStageId.put(jobId, intStageIds);
}
@Override
@@ -134,4 +162,21 @@ public class JobStateListener implements
public synchronized int[] getStageIds(int jobId) {
return jobIdToStageId.get(jobId);
}
+
+ public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
+ return allJobMetrics.get(jobId);
+ }
+
+ public synchronized void cleanup(int jobId) {
+ allJobMetrics.remove(jobId);
+ jobIdToStates.remove(jobId);
+ jobIdToStageId.remove(jobId);
+ Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Integer> entry = iterator.next();
+ if (entry.getValue() == jobId) {
+ iterator.remove();
+ }
+ }
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1637199&r1=1637198&r2=1637199&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Thu Nov 6 19:59:27 2014
@@ -22,12 +22,17 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.executor.InputMetrics;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.spark.ui.jobs.UIData;
@@ -124,7 +129,98 @@ public class SimpleSparkJobStatus implem
@Override
public SparkStatistics getSparkStatistics() {
- return new SparkStatisticsBuilder().add(sparkCounters).build();
+ SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
+ // add Hive operator level statistics.
+ sparkStatisticsBuilder.add(sparkCounters);
+ // add spark job metrics.
+ String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
+ Map<String, List<TaskMetrics>> jobMetric = jobStateListener.getJobMetric(jobId);
+ Map<String, Long> flatJobMetric = combineJobLevelMetrics(jobMetric);
+ for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
+ sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
+ }
+
+ return sparkStatisticsBuilder.build();
+ }
+
+ @Override
+ public void cleanup() {
+ jobStateListener.cleanup(jobId);
+ }
+
+ private Map<String, Long> combineJobLevelMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+ Map<String, Long> results = Maps.newLinkedHashMap();
+
+ long executorDeserializeTime = 0;
+ long executorRunTime = 0;
+ long resultSize = 0;
+ long jvmGCTime = 0;
+ long resultSerializationTime = 0;
+ long memoryBytesSpilled = 0;
+ long diskBytesSpilled = 0;
+ long bytesRead = 0;
+ long remoteBlocksFetched = 0;
+ long localBlocksFetched = 0;
+ long fetchWaitTime = 0;
+ long remoteBytesRead = 0;
+ long shuffleBytesWritten = 0;
+ long shuffleWriteTime = 0;
+ boolean inputMetricExist = false;
+ boolean shuffleReadMetricExist = false;
+ boolean shuffleWriteMetricExist = false;
+
+ for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+ for (TaskMetrics taskMetrics : stageMetric) {
+ executorDeserializeTime += taskMetrics.executorDeserializeTime();
+ executorRunTime += taskMetrics.executorRunTime();
+ resultSize += taskMetrics.resultSize();
+ jvmGCTime += taskMetrics.jvmGCTime();
+ resultSerializationTime += taskMetrics.resultSerializationTime();
+ memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+ diskBytesSpilled += taskMetrics.diskBytesSpilled();
+ if (!taskMetrics.inputMetrics().isEmpty()) {
+ inputMetricExist = true;
+ bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+ }
+ Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+ if (!shuffleReadMetricsOption.isEmpty()) {
+ shuffleReadMetricExist = true;
+ remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+ localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+ fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+ remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+ }
+ Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+ if (!shuffleWriteMetricsOption.isEmpty()) {
+ shuffleWriteMetricExist = true;
+ shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+ shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+ }
+ }
+ }
+
+ results.put("EexcutorDeserializeTime", executorDeserializeTime);
+ results.put("ExecutorRunTime", executorRunTime);
+ results.put("ResultSize", resultSize);
+ results.put("JvmGCTime", jvmGCTime);
+ results.put("ResultSerializationTime", resultSerializationTime);
+ results.put("MemoryBytesSpilled", memoryBytesSpilled);
+ results.put("DiskBytesSpilled", diskBytesSpilled);
+ if (inputMetricExist) {
+ results.put("BytesRead", bytesRead);
+ }
+ if (shuffleReadMetricExist) {
+ results.put("RemoteBlocksFetched", remoteBlocksFetched);
+ results.put("LocalBlocksFetched", localBlocksFetched);
+ results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+ results.put("FetchWaitTime", fetchWaitTime);
+ results.put("RemoteBytesRead", remoteBytesRead);
+ }
+ if (shuffleWriteMetricExist) {
+ results.put("ShuffleBytesWritten", shuffleBytesWritten);
+ results.put("ShuffleWriteTime", shuffleWriteTime);
+ }
+ return results;
}
private List<StageInfo> getStageInfo(int stageId) {