You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/03/14 17:03:10 UTC
hive git commit: HIVE-18034: Improving logging with HoS executors
spend lots of time in GC (Sahil Takiar, reviewed by Rui Li)
Repository: hive
Updated Branches:
refs/heads/master 9cdc08580 -> 57a1ec211
HIVE-18034: Improving logging with HoS executors spend lots of time in GC (Sahil Takiar, reviewed by Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57a1ec21
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57a1ec21
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57a1ec21
Branch: refs/heads/master
Commit: 57a1ec211039d5a5c0eb309adb991283b112520e
Parents: 9cdc085
Author: Sahil Takiar <ta...@gmail.com>
Authored: Wed Mar 14 09:49:02 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Wed Mar 14 09:49:02 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/spark/SparkTask.java | 73 +++++++++++++++++---
.../spark/Statistic/SparkStatisticGroup.java | 18 +++--
.../exec/spark/Statistic/SparkStatistics.java | 21 ++++--
.../spark/Statistic/SparkStatisticsBuilder.java | 8 +--
.../spark/Statistic/SparkStatisticsNames.java | 43 ++++++++++++
.../spark/status/impl/JobMetricsListener.java | 24 +++----
.../spark/status/impl/LocalSparkJobStatus.java | 19 ++---
.../spark/status/impl/RemoteSparkJobStatus.java | 21 +++---
.../spark/status/impl/SparkMetricsUtils.java | 48 +++++--------
.../hadoop/hive/ql/history/HiveHistory.java | 1 +
.../hive/spark/client/MetricsCollection.java | 4 +-
.../apache/hive/spark/client/RemoteDriver.java | 2 +-
.../hive/spark/client/metrics/Metrics.java | 14 ++--
.../spark/client/TestMetricsCollection.java | 13 ++--
14 files changed, 212 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 76f6ecc..c240884 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -27,9 +27,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -77,7 +79,10 @@ public class SparkTask extends Task<SparkWork> {
private static final LogHelper console = new LogHelper(LOG);
private PerfLogger perfLogger;
private static final long serialVersionUID = 1L;
+ // The id of the actual Spark job
private transient int sparkJobID;
+ // The id of the JobHandle used to track the actual Spark job
+ private transient String sparkJobHandleId;
private transient SparkStatistics sparkStatistics;
private transient long submitTime;
private transient long startTime;
@@ -111,36 +116,60 @@ public class SparkTask extends Task<SparkWork> {
SparkWork sparkWork = getWork();
sparkWork.setRequiredCounterPrefix(getOperatorCounters());
+ // Submit the Spark job
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
jobRef = sparkSession.submit(driverContext, sparkWork);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
+ // If the driver context has been shutdown (due to query cancellation) kill the Spark job
if (driverContext.isShutdown()) {
LOG.warn("Killing Spark job");
killJob();
throw new HiveException("Operation is cancelled.");
}
- addToHistory(jobRef);
- this.jobID = jobRef.getSparkJobStatus().getAppID();
+ // Get the Job Handle id associated with the Spark job
+ sparkJobHandleId = jobRef.getJobId();
+
+ // Add Spark job handle id to the Hive History
+ addToHistory(Keys.SPARK_JOB_HANDLE_ID, jobRef.getJobId());
+
+ LOG.debug("Starting Spark job with job handle id " + sparkJobHandleId);
+
+ // Get the application id of the Spark app
+ jobID = jobRef.getSparkJobStatus().getAppID();
+
+ // Start monitoring the Spark job, returns when the Spark job has completed / failed, or if
+ // a timeout occurs
rc = jobRef.monitorJob();
+
+ // Get the id the Spark job that was launched, returns -1 if no Spark job was launched
+ sparkJobID = jobRef.getSparkJobStatus().getJobId();
+
+ // Add Spark job id to the Hive History
+ addToHistory(Keys.SPARK_JOB_ID, Integer.toString(sparkJobID));
+
+ // Get the final state of the Spark job and parses its job info
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
- sparkJobID = sparkJobStatus.getJobId();
getSparkJobInfo(sparkJobStatus, rc);
+
if (rc == 0) {
sparkStatistics = sparkJobStatus.getSparkStatistics();
+ printExcessiveGCWarning();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID));
logSparkStatistic(sparkStatistics);
}
- LOG.info("Successfully completed Spark Job " + sparkJobID + " with application ID " +
+ LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " +
jobID + " and task ID " + getId());
} else if (rc == 2) { // Cancel job if the monitor found job submission timeout.
// TODO: If the timeout is because of lack of resources in the cluster, we should
// ideally also cancel the app request here. But w/o facilities from Spark or YARN,
// it's difficult to do it on hive side alone. See HIVE-12650.
- LOG.info("Failed to submit Spark job " + sparkJobID);
+ LOG.debug("Failed to submit Spark job with job handle id " + sparkJobHandleId);
+ LOG.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(jobID)
+ ? "UNKNOWN" : jobID));
killJob();
} else if (rc == 4) {
LOG.info("The spark job or one stage of it has too many tasks" +
@@ -189,12 +218,35 @@ public class SparkTask extends Task<SparkWork> {
return rc;
}
- private void addToHistory(SparkJobRef jobRef) {
- console.printInfo("Starting Spark Job = " + jobRef.getJobId());
+ /**
+ * Use the Spark metrics and calculate how much task executione time was spent performing GC
+ * operations. If more than a defined threshold of time is spent, print out a warning on the
+ * console.
+ */
+ private void printExcessiveGCWarning() {
+ SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup(
+ SparkStatisticsNames.SPARK_GROUP_NAME);
+ if (sparkStatisticGroup != null) {
+ long taskDurationTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic(
+ SparkStatisticsNames.TASK_DURATION_TIME).getValue());
+ long jvmGCTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic(
+ SparkStatisticsNames.JVM_GC_TIME).getValue());
+
+ // Threshold percentage to trigger the GC warning
+ double threshold = 0.1;
+
+ if (jvmGCTime > taskDurationTime * threshold) {
+ long percentGcTime = Math.round((double) jvmGCTime / taskDurationTime * 100);
+ String gcWarning = String.format("WARNING: Spark Job[%s] Spent %s%% (%s ms / %s ms) of " +
+ "task time in GC", sparkJobID, percentGcTime, jvmGCTime, taskDurationTime);
+ console.printInfo(gcWarning);
+ }
+ }
+ }
+
+ private void addToHistory(Keys key, String value) {
if (SessionState.get() != null) {
- SessionState.get().getHiveHistory()
- .setQueryProperty(queryState.getQueryId(), Keys.SPARK_JOB_ID,
- Integer.toString(jobRef.getSparkJobStatus().getJobId()));
+ SessionState.get().getHiveHistory().setQueryProperty(queryState.getQueryId(), key, value);
}
}
@@ -327,6 +379,7 @@ public class SparkTask extends Task<SparkWork> {
}
private void killJob() {
+ LOG.debug("Killing Spark job with job handle id " + sparkJobHandleId);
boolean needToKillJob = false;
if (jobRef != null && !jobKilled) {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
index 5ab4d16..e1006e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
@@ -17,17 +17,20 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.Statistic;
-import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
public class SparkStatisticGroup {
private final String groupName;
- private final List<SparkStatistic> statisticList;
+ private final Map<String, SparkStatistic> statistics = new LinkedHashMap<>();
SparkStatisticGroup(String groupName, List<SparkStatistic> statisticList) {
this.groupName = groupName;
- this.statisticList = Collections.unmodifiableList(statisticList);
+ for (SparkStatistic sparkStatistic : statisticList) {
+ this.statistics.put(sparkStatistic.getName(), sparkStatistic);
+ }
}
public String getGroupName() {
@@ -35,6 +38,13 @@ public class SparkStatisticGroup {
}
public Iterator<SparkStatistic> getStatistics() {
- return this.statisticList.iterator();
+ return this.statistics.values().iterator();
+ }
+
+ /**
+ * Get a {@link SparkStatistic} by its given name
+ */
+ public SparkStatistic getSparkStatistic(String name) {
+ return this.statistics.get(name);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
index 584e8bf..946cadc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
@@ -17,19 +17,26 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.Statistic;
-
-import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
public class SparkStatistics {
- private final List<SparkStatisticGroup> statisticGroups;
- SparkStatistics(List<SparkStatisticGroup> statisticGroups) {
- this.statisticGroups = Collections.unmodifiableList(statisticGroups);
+ private final Map<String, SparkStatisticGroup> statisticGroups = new LinkedHashMap<>();
+
+ SparkStatistics(List<SparkStatisticGroup> statisticGroupsList) {
+ for (SparkStatisticGroup group : statisticGroupsList) {
+ statisticGroups.put(group.getGroupName(), group);
+ }
}
public Iterator<SparkStatisticGroup> getStatisticGroups() {
- return this.statisticGroups.iterator();
+ return this.statisticGroups.values().iterator();
+ }
+
+ public SparkStatisticGroup getStatisticGroup(String groupName) {
+ return this.statisticGroups.get(groupName);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
index 6ebc274..d31d60a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
@@ -21,7 +21,7 @@ import org.apache.hive.spark.counter.SparkCounter;
import org.apache.hive.spark.counter.SparkCounterGroup;
import org.apache.hive.spark.counter.SparkCounters;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -31,15 +31,15 @@ public class SparkStatisticsBuilder {
private Map<String, List<SparkStatistic>> statisticMap;
public SparkStatisticsBuilder() {
- statisticMap = new HashMap<String, List<SparkStatistic>>();
+ statisticMap = new LinkedHashMap<>();
}
public SparkStatistics build() {
List<SparkStatisticGroup> statisticGroups = new LinkedList<SparkStatisticGroup>();
for (Map.Entry<String, List<SparkStatistic>> entry : statisticMap.entrySet()) {
String groupName = entry.getKey();
- List<SparkStatistic> statisitcList = entry.getValue();
- statisticGroups.add(new SparkStatisticGroup(groupName, statisitcList));
+ List<SparkStatistic> statisticList = entry.getValue();
+ statisticGroups.add(new SparkStatisticGroup(groupName, statisticList));
}
return new SparkStatistics(statisticGroups);
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
new file mode 100644
index 0000000..ca93a80
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.Statistic;
+
+/**
+ * A collection of names that define different {@link SparkStatistic} objects.
+ */
+public class SparkStatisticsNames {
+
+ public static final String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime";
+ public static final String EXECUTOR_RUN_TIME = "ExecutorRunTime";
+ public static final String RESULT_SIZE = "ResultSize";
+ public static final String JVM_GC_TIME = "JvmGCTime";
+ public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime";
+ public static final String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled";
+ public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled";
+ public static final String BYTES_READ = "BytesRead";
+ public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched";
+ public static final String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched";
+ public static final String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched";
+ public static final String FETCH_WAIT_TIME = "FetchWaitTime";
+ public static final String REMOTE_BYTES_READ = "RemoteBytesRead";
+ public static final String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten";
+ public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime";
+ public static final String TASK_DURATION_TIME = "TaskDurationTime";
+
+ public static final String SPARK_GROUP_NAME = "SPARK";
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
index eaeb4dc..773fe97 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+import java.util.AbstractMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -28,6 +29,7 @@ import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.TaskInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -38,7 +40,8 @@ public class JobMetricsListener extends SparkListener {
private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
- private final Map<Integer, Map<Integer, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
+ private final Map<Integer, Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>>> allJobMetrics =
+ Maps.newHashMap();
@Override
public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
@@ -47,17 +50,12 @@ public class JobMetricsListener extends SparkListener {
if (jobId == null) {
LOG.warn("Can not find job id for stage[" + stageId + "].");
} else {
- Map<Integer, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
- if (jobMetrics == null) {
- jobMetrics = Maps.newHashMap();
- allJobMetrics.put(jobId, jobMetrics);
- }
- List<TaskMetrics> stageMetrics = jobMetrics.get(stageId);
- if (stageMetrics == null) {
- stageMetrics = Lists.newLinkedList();
- jobMetrics.put(stageId, stageMetrics);
- }
- stageMetrics.add(taskEnd.taskMetrics());
+ Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> jobMetrics = allJobMetrics.computeIfAbsent(
+ jobId, k -> Maps.newHashMap());
+ List<Map.Entry<TaskMetrics, TaskInfo>> stageMetrics = jobMetrics.computeIfAbsent(stageId,
+ k -> Lists.newLinkedList());
+
+ stageMetrics.add(new AbstractMap.SimpleEntry<>(taskEnd.taskMetrics(), taskEnd.taskInfo()));
}
}
@@ -74,7 +72,7 @@ public class JobMetricsListener extends SparkListener {
jobIdToStageId.put(jobId, intStageIds);
}
- public synchronized Map<Integer, List<TaskMetrics>> getJobMetric(int jobId) {
+ public synchronized Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> getJobMetric(int jobId) {
return allJobMetrics.get(jobId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index 8b031e7..03f8a0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -17,28 +17,31 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.status.impl;
-import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
import org.apache.hive.spark.client.MetricsCollection;
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.counter.SparkCounters;
+
import org.apache.spark.JobExecutionStatus;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.TaskInfo;
public class LocalSparkJobStatus implements SparkJobStatus {
@@ -129,8 +132,7 @@ public class LocalSparkJobStatus implements SparkJobStatus {
// add Hive operator level statistics.
sparkStatisticsBuilder.add(sparkCounters);
// add spark job metrics.
- String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
- Map<Integer, List<TaskMetrics>> jobMetric = jobMetricsListener.getJobMetric(jobId);
+ Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> jobMetric = jobMetricsListener.getJobMetric(jobId);
if (jobMetric == null) {
return null;
}
@@ -138,16 +140,17 @@ public class LocalSparkJobStatus implements SparkJobStatus {
MetricsCollection metricsCollection = new MetricsCollection();
Set<Integer> stageIds = jobMetric.keySet();
for (int stageId : stageIds) {
- List<TaskMetrics> taskMetrics = jobMetric.get(stageId);
- for (TaskMetrics taskMetric : taskMetrics) {
- Metrics metrics = new Metrics(taskMetric);
+ List<Map.Entry<TaskMetrics, TaskInfo>> taskMetrics = jobMetric.get(stageId);
+ for (Map.Entry<TaskMetrics, TaskInfo> taskMetric : taskMetrics) {
+ Metrics metrics = new Metrics(taskMetric.getKey(), taskMetric.getValue());
metricsCollection.addMetrics(jobId, stageId, 0, metrics);
}
}
Map<String, Long> flatJobMetric = SparkMetricsUtils.collectMetrics(metricsCollection
.getAllMetrics());
for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
- sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
+ sparkStatisticsBuilder.add(SparkStatisticsNames.SPARK_GROUP_NAME, entry.getKey(),
+ Long.toString(entry.getValue()));
}
return sparkStatisticsBuilder.build();
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index ec7ca40..ff969e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -19,28 +19,30 @@
package org.apache.hadoop.hive.ql.exec.spark.status.impl;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hive.spark.client.MetricsCollection;
-import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.hive.spark.client.MetricsCollection;
import org.apache.hive.spark.client.Job;
import org.apache.hive.spark.client.JobContext;
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.SparkClient;
+import org.apache.hive.spark.counter.SparkCounters;
+
import org.apache.spark.JobExecutionStatus;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
import org.apache.spark.api.java.JavaFutureAction;
import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -125,16 +127,19 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
if (metricsCollection == null || getCounter() == null) {
return null;
}
+
SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
- // add Hive operator level statistics.
+
+ // add Hive operator level statistics. - e.g. RECORDS_IN, RECORDS_OUT
sparkStatisticsBuilder.add(getCounter());
- // add spark job metrics.
- String jobIdentifier = "Spark Job[" + getJobId() + "] Metrics";
+ // add spark job metrics. - e.g. metrics collected by Spark itself (JvmGCTime,
+ // ExecutorRunTime, etc.)
Map<String, Long> flatJobMetric = SparkMetricsUtils.collectMetrics(
metricsCollection.getAllMetrics());
for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
- sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
+ sparkStatisticsBuilder.add(SparkStatisticsNames.SPARK_GROUP_NAME, entry.getKey(),
+ Long.toString(entry.getValue()));
}
return sparkStatisticsBuilder.build();
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
index dd17168..f72407e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
@@ -20,54 +20,40 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
final class SparkMetricsUtils {
- private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime";
- private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime";
- private final static String RESULT_SIZE = "ResultSize";
- private final static String JVM_GC_TIME = "JvmGCTime";
- private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime";
- private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled";
- private final static String DISK_BYTES_SPILLED = "DiskBytesSpilled";
- private final static String BYTES_READ = "BytesRead";
- private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched";
- private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched";
- private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched";
- private final static String FETCH_WAIT_TIME = "FetchWaitTime";
- private final static String REMOTE_BYTES_READ = "RemoteBytesRead";
- private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten";
- private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime";
-
private SparkMetricsUtils(){}
static Map<String, Long> collectMetrics(Metrics allMetrics) {
Map<String, Long> results = new LinkedHashMap<String, Long>();
- results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime);
- results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime);
- results.put(RESULT_SIZE, allMetrics.resultSize);
- results.put(JVM_GC_TIME, allMetrics.jvmGCTime);
- results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime);
- results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled);
- results.put(DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled);
+ results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime);
+ results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime);
+ results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize);
+ results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime);
+ results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime);
+ results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled);
+ results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled);
+ results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime);
if (allMetrics.inputMetrics != null) {
- results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead);
+ results.put(SparkStatisticsNames.BYTES_READ, allMetrics.inputMetrics.bytesRead);
}
if (allMetrics.shuffleReadMetrics != null) {
ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics;
long rbf = shuffleReadMetrics.remoteBlocksFetched;
long lbf = shuffleReadMetrics.localBlocksFetched;
- results.put(REMOTE_BLOCKS_FETCHED, rbf);
- results.put(LOCAL_BLOCKS_FETCHED, lbf);
- results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf);
- results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime);
- results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead);
+ results.put(SparkStatisticsNames.REMOTE_BLOCKS_FETCHED, rbf);
+ results.put(SparkStatisticsNames.LOCAL_BLOCKS_FETCHED, lbf);
+ results.put(SparkStatisticsNames.TOTAL_BLOCKS_FETCHED, rbf + lbf);
+ results.put(SparkStatisticsNames.FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime);
+ results.put(SparkStatisticsNames.REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead);
}
if (allMetrics.shuffleWriteMetrics != null) {
- results.put(SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
- results.put(SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime);
+ results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
+ results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime);
}
return results;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
index a5bafbc..327628f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
@@ -69,6 +69,7 @@ public interface HiveHistory {
TASK_NUM_MAPPERS,
TASK_NUM_REDUCERS,
ROWS_INSERTED,
+ SPARK_JOB_HANDLE_ID,
SPARK_JOB_ID
};
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index 0f03a64..526aefd 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -25,7 +25,6 @@ import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
-import org.apache.hive.spark.client.metrics.DataReadMethod;
import org.apache.hive.spark.client.metrics.InputMetrics;
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
@@ -148,6 +147,7 @@ public class MetricsCollection {
long resultSerializationTime = 0L;
long memoryBytesSpilled = 0L;
long diskBytesSpilled = 0L;
+ long taskDurationTime = 0L;
// Input metrics.
boolean hasInputMetrics = false;
@@ -173,6 +173,7 @@ public class MetricsCollection {
resultSerializationTime += m.resultSerializationTime;
memoryBytesSpilled += m.memoryBytesSpilled;
diskBytesSpilled += m.diskBytesSpilled;
+ taskDurationTime += m.taskDurationTime;
if (m.inputMetrics != null) {
hasInputMetrics = true;
@@ -222,6 +223,7 @@ public class MetricsCollection {
resultSerializationTime,
memoryBytesSpilled,
diskBytesSpilled,
+ taskDurationTime,
inputMetrics,
shuffleReadMetrics,
shuffleWriteMetrics);
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index e584cbb..f221d0a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -478,7 +478,7 @@ public class RemoteDriver {
public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
if (taskEnd.reason() instanceof org.apache.spark.Success$
&& !taskEnd.taskInfo().speculative()) {
- Metrics metrics = new Metrics(taskEnd.taskMetrics());
+ Metrics metrics = new Metrics(taskEnd.taskMetrics(), taskEnd.taskInfo());
Integer jobId;
synchronized (stageToJobId) {
jobId = stageToJobId.get(taskEnd.stageId());
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index 418d534..9da0116 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -19,10 +19,11 @@ package org.apache.hive.spark.client.metrics;
import java.io.Serializable;
-import org.apache.spark.executor.TaskMetrics;
-
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.TaskInfo;
+
/**
* Metrics tracked during the execution of a job.
*
@@ -46,6 +47,8 @@ public class Metrics implements Serializable {
public final long memoryBytesSpilled;
/** The number of on-disk bytes spilled by tasks. */
public final long diskBytesSpilled;
+ /** Amount of time spent executing tasks. */
+ public final long taskDurationTime;
/** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */
public final InputMetrics inputMetrics;
/**
@@ -58,7 +61,7 @@ public class Metrics implements Serializable {
private Metrics() {
// For Serialization only.
- this(0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
+ this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
}
public Metrics(
@@ -69,6 +72,7 @@ public class Metrics implements Serializable {
long resultSerializationTime,
long memoryBytesSpilled,
long diskBytesSpilled,
+ long taskDurationTime,
InputMetrics inputMetrics,
ShuffleReadMetrics shuffleReadMetrics,
ShuffleWriteMetrics shuffleWriteMetrics) {
@@ -79,12 +83,13 @@ public class Metrics implements Serializable {
this.resultSerializationTime = resultSerializationTime;
this.memoryBytesSpilled = memoryBytesSpilled;
this.diskBytesSpilled = diskBytesSpilled;
+ this.taskDurationTime = taskDurationTime;
this.inputMetrics = inputMetrics;
this.shuffleReadMetrics = shuffleReadMetrics;
this.shuffleWriteMetrics = shuffleWriteMetrics;
}
- public Metrics(TaskMetrics metrics) {
+ public Metrics(TaskMetrics metrics, TaskInfo taskInfo) {
this(
metrics.executorDeserializeTime(),
metrics.executorRunTime(),
@@ -93,6 +98,7 @@ public class Metrics implements Serializable {
metrics.resultSerializationTime(),
metrics.memoryBytesSpilled(),
metrics.diskBytesSpilled(),
+ taskInfo.duration(),
optionalInputMetric(metrics),
optionalShuffleReadMetric(metrics),
optionalShuffleWriteMetrics(metrics));
http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 8fef66b..87b460d 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -66,7 +66,7 @@ public class TestMetricsCollection {
@Test
public void testOptionalMetrics() {
long value = taskValue(1, 1, 1L);
- Metrics metrics = new Metrics(value, value, value, value, value, value, value,
+ Metrics metrics = new Metrics(value, value, value, value, value, value, value, value,
null, null, null);
MetricsCollection collection = new MetricsCollection();
@@ -94,10 +94,10 @@ public class TestMetricsCollection {
MetricsCollection collection = new MetricsCollection();
long value = taskValue(1, 1, 1);
- Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
- new InputMetrics(value), null, null);
- Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
- new InputMetrics(value), null, null);
+ Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value,
+ new InputMetrics(value), null, null);
+ Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value,
+ new InputMetrics(value), null, null);
collection.addMetrics(1, 1, 1, metrics1);
collection.addMetrics(1, 1, 2, metrics2);
@@ -108,7 +108,7 @@ public class TestMetricsCollection {
private Metrics makeMetrics(int jobId, int stageId, long taskId) {
long value = 1000000 * jobId + 1000 * stageId + taskId;
- return new Metrics(value, value, value, value, value, value, value,
+ return new Metrics(value, value, value, value, value, value, value, value,
new InputMetrics(value),
new ShuffleReadMetrics((int) value, (int) value, value, value),
new ShuffleWriteMetrics(value, value));
@@ -154,6 +154,7 @@ public class TestMetricsCollection {
assertEquals(expected, metrics.resultSerializationTime);
assertEquals(expected, metrics.memoryBytesSpilled);
assertEquals(expected, metrics.diskBytesSpilled);
+ assertEquals(expected, metrics.taskDurationTime);
assertEquals(expected, metrics.inputMetrics.bytesRead);