You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/04/06 04:42:47 UTC
hive git commit: HIVE-18651: Expose additional Spark metrics (Sahil
Takiar, reviewed by Vihang Karajgaonkar)
Repository: hive
Updated Branches:
refs/heads/master 2f802e908 -> eb40ea57e
HIVE-18651: Expose additional Spark metrics (Sahil Takiar, reviewed by Vihang Karajgaonkar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb40ea57
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb40ea57
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb40ea57
Branch: refs/heads/master
Commit: eb40ea57eac4c3ff46f638cf4ab83bec71b5eda5
Parents: 2f802e9
Author: Sahil Takiar <ta...@gmail.com>
Authored: Fri Apr 6 11:42:23 2018 +0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri Apr 6 11:42:23 2018 +0700
----------------------------------------------------------------------
.../hive/ql/exec/spark/TestSparkStatistics.java | 100 +++++++++++++++++++
.../hadoop/hive/ql/exec/spark/SparkTask.java | 18 +++-
.../spark/Statistic/SparkStatisticsNames.java | 4 +-
.../spark/status/impl/SparkMetricsUtils.java | 5 +-
.../hive/ql/exec/spark/TestSparkTask.java | 14 +++
.../hive/spark/client/MetricsCollection.java | 6 ++
.../hive/spark/client/metrics/Metrics.java | 13 ++-
.../spark/client/TestMetricsCollection.java | 17 ++--
8 files changed, 162 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
new file mode 100644
index 0000000..be3b501
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TestSparkStatistics {
+
+ @Test
+ public void testSparkStatistics() {
+ HiveConf conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ SQLStdHiveAuthorizerFactory.class.getName());
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
+ conf.set("spark.master", "local-cluster[1,2,1024]");
+
+ SessionState.start(conf);
+
+ Driver driver = null;
+
+ try {
+ driver = new Driver(new QueryState.Builder()
+ .withGenerateNewQueryId(true)
+ .withHiveConf(conf).build(),
+ null, null);
+
+ Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode());
+ Assert.assertEquals(0, driver.compile("select * from test order by col"));
+
+ List<SparkTask> sparkTasks = Utilities.getSparkTasks(driver.getPlan().getRootTasks());
+ Assert.assertEquals(1, sparkTasks.size());
+
+ SparkTask sparkTask = sparkTasks.get(0);
+
+ DriverContext driverCxt = new DriverContext(driver.getContext());
+ driverCxt.prepare(driver.getPlan());
+
+ sparkTask.initialize(driver.getQueryState(), driver.getPlan(), driverCxt, driver.getContext()
+ .getOpContext());
+ Assert.assertEquals(0, sparkTask.execute(driverCxt));
+
+ Assert.assertNotNull(sparkTask.getSparkStatistics());
+
+ List<SparkStatistic> sparkStats = Lists.newArrayList(sparkTask.getSparkStatistics()
+ .getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics());
+
+ Assert.assertEquals(18, sparkStats.size());
+
+ Map<String, String> statsMap = sparkStats.stream().collect(
+ Collectors.toMap(SparkStatistic::getName, SparkStatistic::getValue));
+
+ Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.TASK_DURATION_TIME)) > 0);
+ Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_CPU_TIME)) > 0);
+ Assert.assertTrue(
+ Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME)) > 0);
+ Assert.assertTrue(
+ Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME)) > 0);
+ Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_RUN_TIME)) > 0);
+ } finally {
+ if (driver != null) {
+ Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode());
+ driver.destroy();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index c240884..3083e30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -158,8 +159,7 @@ public class SparkTask extends Task<SparkWork> {
sparkStatistics = sparkJobStatus.getSparkStatistics();
printExcessiveGCWarning();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
- LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID));
- logSparkStatistic(sparkStatistics);
+ LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID));
}
LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " +
jobID + " and task ID " + getId());
@@ -250,17 +250,25 @@ public class SparkTask extends Task<SparkWork> {
}
}
- private void logSparkStatistic(SparkStatistics sparkStatistic) {
+ @VisibleForTesting
+ static String sparkStatisticsToString(SparkStatistics sparkStatistic, int sparkJobID) {
+ StringBuilder sparkStatsString = new StringBuilder();
+ sparkStatsString.append("\n\n");
+ sparkStatsString.append(String.format("=====Spark Job[%d] Statistics=====", sparkJobID));
+ sparkStatsString.append("\n\n");
+
Iterator<SparkStatisticGroup> groupIterator = sparkStatistic.getStatisticGroups();
while (groupIterator.hasNext()) {
SparkStatisticGroup group = groupIterator.next();
- LOG.info(group.getGroupName());
+ sparkStatsString.append(group.getGroupName()).append("\n");
Iterator<SparkStatistic> statisticIterator = group.getStatistics();
while (statisticIterator.hasNext()) {
SparkStatistic statistic = statisticIterator.next();
- LOG.info("\t" + statistic.getName() + ": " + statistic.getValue());
+ sparkStatsString.append("\t").append(statistic.getName()).append(": ").append(
+ statistic.getValue()).append("\n");
}
}
+ return sparkStatsString.toString();
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
index ca93a80..68e4f9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
@@ -23,11 +23,13 @@ package org.apache.hadoop.hive.ql.exec.spark.Statistic;
public class SparkStatisticsNames {
public static final String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime";
+ public static final String EXECUTOR_DESERIALIZE_CPU_TIME = "ExecutorDeserializeCpuTime";
public static final String EXECUTOR_RUN_TIME = "ExecutorRunTime";
+ public static final String EXECUTOR_CPU_TIME = "ExecutorCpuTime";
public static final String RESULT_SIZE = "ResultSize";
public static final String JVM_GC_TIME = "JvmGCTime";
public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime";
- public static final String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled";
+ public static final String MEMORY_BYTES_SPILLED = "MemoryBytesSpilled";
public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled";
public static final String BYTES_READ = "BytesRead";
public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched";
http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
index f72407e..fab5422 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
@@ -31,11 +31,14 @@ final class SparkMetricsUtils {
static Map<String, Long> collectMetrics(Metrics allMetrics) {
Map<String, Long> results = new LinkedHashMap<String, Long>();
results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime);
+ results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME,
+ allMetrics.executorDeserializeCpuTime);
results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime);
+ results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, allMetrics.executorCpuTime);
results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize);
results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime);
results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime);
- results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled);
+ results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, allMetrics.memoryBytesSpilled);
results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled);
results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime);
if (allMetrics.inputMetrics != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
index 435c6b6..75b4151 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -96,6 +97,19 @@ public class TestSparkTask {
Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3);
}
+ @Test
+ public void testSparkStatisticsToString() {
+ SparkStatisticsBuilder statsBuilder = new SparkStatisticsBuilder();
+ statsBuilder.add("TEST", "stat1", "1");
+ statsBuilder.add("TEST", "stat2", "1");
+ String statsString = SparkTask.sparkStatisticsToString(statsBuilder.build(), 10);
+
+ Assert.assertTrue(statsString.contains("10"));
+ Assert.assertTrue(statsString.contains("TEST"));
+ Assert.assertTrue(statsString.contains("stat1"));
+ Assert.assertTrue(statsString.contains("stat2"));
+ Assert.assertTrue(statsString.contains("1"));
+ }
private boolean isEmptySparkWork(SparkWork sparkWork) {
List<BaseWork> allWorks = sparkWork.getAllWork();
http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index 526aefd..2f3c026 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -141,7 +141,9 @@ public class MetricsCollection {
try {
// Task metrics.
long executorDeserializeTime = 0L;
+ long executorDeserializeCpuTime = 0L;
long executorRunTime = 0L;
+ long executorCpuTime = 0L;
long resultSize = 0L;
long jvmGCTime = 0L;
long resultSerializationTime = 0L;
@@ -167,7 +169,9 @@ public class MetricsCollection {
for (TaskInfo info : Collections2.filter(taskMetrics, filter)) {
Metrics m = info.metrics;
executorDeserializeTime += m.executorDeserializeTime;
+ executorDeserializeCpuTime += m.executorDeserializeCpuTime;
executorRunTime += m.executorRunTime;
+ executorCpuTime += m.executorCpuTime;
resultSize += m.resultSize;
jvmGCTime += m.jvmGCTime;
resultSerializationTime += m.resultSerializationTime;
@@ -217,7 +221,9 @@ public class MetricsCollection {
return new Metrics(
executorDeserializeTime,
+ executorDeserializeCpuTime,
executorRunTime,
+ executorCpuTime,
resultSize,
jvmGCTime,
resultSerializationTime,
http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index 9da0116..b718b3b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -18,6 +18,7 @@
package org.apache.hive.spark.client.metrics;
import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -35,8 +36,12 @@ public class Metrics implements Serializable {
/** Time taken on the executor to deserialize tasks. */
public final long executorDeserializeTime;
+ /** CPU time taken on the executor to deserialize tasks. */
+ public final long executorDeserializeCpuTime;
/** Time the executor spends actually running the task (including fetching shuffle data). */
public final long executorRunTime;
+ /** CPU time the executor spends running the task (including fetching shuffle data). */
+ public final long executorCpuTime;
/** The number of bytes sent back to the driver by tasks. */
public final long resultSize;
/** Amount of time the JVM spent in garbage collection while executing tasks. */
@@ -61,12 +66,14 @@ public class Metrics implements Serializable {
private Metrics() {
// For Serialization only.
- this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
+ this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
}
public Metrics(
long executorDeserializeTime,
+ long executorDeserializeCpuTime,
long executorRunTime,
+ long executorCpuTime,
long resultSize,
long jvmGCTime,
long resultSerializationTime,
@@ -77,7 +84,9 @@ public class Metrics implements Serializable {
ShuffleReadMetrics shuffleReadMetrics,
ShuffleWriteMetrics shuffleWriteMetrics) {
this.executorDeserializeTime = executorDeserializeTime;
+ this.executorDeserializeCpuTime = executorDeserializeCpuTime;
this.executorRunTime = executorRunTime;
+ this.executorCpuTime = executorCpuTime;
this.resultSize = resultSize;
this.jvmGCTime = jvmGCTime;
this.resultSerializationTime = resultSerializationTime;
@@ -92,7 +101,9 @@ public class Metrics implements Serializable {
public Metrics(TaskMetrics metrics, TaskInfo taskInfo) {
this(
metrics.executorDeserializeTime(),
+ TimeUnit.NANOSECONDS.toMillis(metrics.executorDeserializeCpuTime()),
metrics.executorRunTime(),
+ TimeUnit.NANOSECONDS.toMillis(metrics.executorCpuTime()),
metrics.resultSize(),
metrics.jvmGCTime(),
metrics.resultSerializationTime(),
http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 87b460d..c5884cf 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -66,8 +66,8 @@ public class TestMetricsCollection {
@Test
public void testOptionalMetrics() {
long value = taskValue(1, 1, 1L);
- Metrics metrics = new Metrics(value, value, value, value, value, value, value, value,
- null, null, null);
+ Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, value,
+ value, null, null, null);
MetricsCollection collection = new MetricsCollection();
for (int i : Arrays.asList(1, 2)) {
@@ -94,10 +94,11 @@ public class TestMetricsCollection {
MetricsCollection collection = new MetricsCollection();
long value = taskValue(1, 1, 1);
- Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value,
- new InputMetrics(value), null, null);
- Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value,
- new InputMetrics(value), null, null);
+
+ Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value,
+ value, new InputMetrics(value), null, null);
+ Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value,
+ value, new InputMetrics(value), null, null);
collection.addMetrics(1, 1, 1, metrics1);
collection.addMetrics(1, 1, 2, metrics2);
@@ -108,7 +109,7 @@ public class TestMetricsCollection {
private Metrics makeMetrics(int jobId, int stageId, long taskId) {
long value = 1000000 * jobId + 1000 * stageId + taskId;
- return new Metrics(value, value, value, value, value, value, value, value,
+ return new Metrics(value, value, value, value, value, value, value, value, value, value,
new InputMetrics(value),
new ShuffleReadMetrics((int) value, (int) value, value, value),
new ShuffleWriteMetrics(value, value));
@@ -148,7 +149,9 @@ public class TestMetricsCollection {
private void checkMetrics(Metrics metrics, long expected) {
assertEquals(expected, metrics.executorDeserializeTime);
+ assertEquals(expected, metrics.executorDeserializeCpuTime);
assertEquals(expected, metrics.executorRunTime);
+ assertEquals(expected, metrics.executorCpuTime);
assertEquals(expected, metrics.resultSize);
assertEquals(expected, metrics.jvmGCTime);
assertEquals(expected, metrics.resultSerializationTime);