You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/04/06 04:42:47 UTC

hive git commit: HIVE-18651: Expose additional Spark metrics (Sahil Takiar, reviewed by Vihang Karajgaonkar)

Repository: hive
Updated Branches:
  refs/heads/master 2f802e908 -> eb40ea57e


HIVE-18651: Expose additional Spark metrics (Sahil Takiar, reviewed by Vihang Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb40ea57
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb40ea57
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb40ea57

Branch: refs/heads/master
Commit: eb40ea57eac4c3ff46f638cf4ab83bec71b5eda5
Parents: 2f802e9
Author: Sahil Takiar <ta...@gmail.com>
Authored: Fri Apr 6 11:42:23 2018 +0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri Apr 6 11:42:23 2018 +0700

----------------------------------------------------------------------
 .../hive/ql/exec/spark/TestSparkStatistics.java | 100 +++++++++++++++++++
 .../hadoop/hive/ql/exec/spark/SparkTask.java    |  18 +++-
 .../spark/Statistic/SparkStatisticsNames.java   |   4 +-
 .../spark/status/impl/SparkMetricsUtils.java    |   5 +-
 .../hive/ql/exec/spark/TestSparkTask.java       |  14 +++
 .../hive/spark/client/MetricsCollection.java    |   6 ++
 .../hive/spark/client/metrics/Metrics.java      |  13 ++-
 .../spark/client/TestMetricsCollection.java     |  17 ++--
 8 files changed, 162 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
new file mode 100644
index 0000000..be3b501
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TestSparkStatistics {
+
+  @Test
+  public void testSparkStatistics() {
+    HiveConf conf = new HiveConf();
+    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+            SQLStdHiveAuthorizerFactory.class.getName());
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
+    conf.set("spark.master", "local-cluster[1,2,1024]");
+
+    SessionState.start(conf);
+
+    Driver driver = null;
+
+    try {
+      driver = new Driver(new QueryState.Builder()
+              .withGenerateNewQueryId(true)
+              .withHiveConf(conf).build(),
+              null, null);
+
+      Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode());
+      Assert.assertEquals(0, driver.compile("select * from test order by col"));
+
+      List<SparkTask> sparkTasks = Utilities.getSparkTasks(driver.getPlan().getRootTasks());
+      Assert.assertEquals(1, sparkTasks.size());
+
+      SparkTask sparkTask = sparkTasks.get(0);
+
+      DriverContext driverCxt = new DriverContext(driver.getContext());
+      driverCxt.prepare(driver.getPlan());
+
+      sparkTask.initialize(driver.getQueryState(), driver.getPlan(), driverCxt, driver.getContext()
+              .getOpContext());
+      Assert.assertEquals(0, sparkTask.execute(driverCxt));
+
+      Assert.assertNotNull(sparkTask.getSparkStatistics());
+
+      List<SparkStatistic> sparkStats = Lists.newArrayList(sparkTask.getSparkStatistics()
+              .getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics());
+
+      Assert.assertEquals(18, sparkStats.size());
+
+      Map<String, String> statsMap = sparkStats.stream().collect(
+              Collectors.toMap(SparkStatistic::getName, SparkStatistic::getValue));
+
+      Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.TASK_DURATION_TIME)) > 0);
+      Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_CPU_TIME)) > 0);
+      Assert.assertTrue(
+              Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME)) > 0);
+      Assert.assertTrue(
+              Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME)) > 0);
+      Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_RUN_TIME)) > 0);
+    } finally {
+      if (driver != null) {
+        Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode());
+        driver.destroy();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index c240884..3083e30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -158,8 +159,7 @@ public class SparkTask extends Task<SparkWork> {
         sparkStatistics = sparkJobStatus.getSparkStatistics();
         printExcessiveGCWarning();
         if (LOG.isInfoEnabled() && sparkStatistics != null) {
-          LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID));
-          logSparkStatistic(sparkStatistics);
+          LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID));
         }
         LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " +
                 jobID + " and task ID " + getId());
@@ -250,17 +250,25 @@ public class SparkTask extends Task<SparkWork> {
     }
   }
 
-  private void logSparkStatistic(SparkStatistics sparkStatistic) {
+  @VisibleForTesting
+  static String sparkStatisticsToString(SparkStatistics sparkStatistic, int sparkJobID) {
+    StringBuilder sparkStatsString = new StringBuilder();
+    sparkStatsString.append("\n\n");
+    sparkStatsString.append(String.format("=====Spark Job[%d] Statistics=====", sparkJobID));
+    sparkStatsString.append("\n\n");
+
     Iterator<SparkStatisticGroup> groupIterator = sparkStatistic.getStatisticGroups();
     while (groupIterator.hasNext()) {
       SparkStatisticGroup group = groupIterator.next();
-      LOG.info(group.getGroupName());
+      sparkStatsString.append(group.getGroupName()).append("\n");
       Iterator<SparkStatistic> statisticIterator = group.getStatistics();
       while (statisticIterator.hasNext()) {
         SparkStatistic statistic = statisticIterator.next();
-        LOG.info("\t" + statistic.getName() + ": " + statistic.getValue());
+        sparkStatsString.append("\t").append(statistic.getName()).append(": ").append(
+                statistic.getValue()).append("\n");
       }
     }
+    return sparkStatsString.toString();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
index ca93a80..68e4f9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
@@ -23,11 +23,13 @@ package org.apache.hadoop.hive.ql.exec.spark.Statistic;
 public class SparkStatisticsNames {
 
   public static final String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime";
+  public static final String EXECUTOR_DESERIALIZE_CPU_TIME = "ExecutorDeserializeCpuTime";
   public static final String EXECUTOR_RUN_TIME = "ExecutorRunTime";
+  public static final String EXECUTOR_CPU_TIME = "ExecutorCpuTime";
   public static final String RESULT_SIZE = "ResultSize";
   public static final String JVM_GC_TIME = "JvmGCTime";
   public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime";
-  public static final String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled";
+  public static final String MEMORY_BYTES_SPILLED = "MemoryBytesSpilled";
   public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled";
   public static final String BYTES_READ = "BytesRead";
   public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched";

http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
index f72407e..fab5422 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
@@ -31,11 +31,14 @@ final class SparkMetricsUtils {
   static Map<String, Long> collectMetrics(Metrics allMetrics) {
     Map<String, Long> results = new LinkedHashMap<String, Long>();
     results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime);
+    results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME,
+            allMetrics.executorDeserializeCpuTime);
     results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime);
+    results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, allMetrics.executorCpuTime);
     results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize);
     results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime);
     results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime);
-    results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled);
+    results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, allMetrics.memoryBytesSpilled);
     results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled);
     results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime);
     if (allMetrics.inputMetrics != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
index 435c6b6..75b4151 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -96,6 +97,19 @@ public class TestSparkTask {
     Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3);
   }
 
+  @Test
+  public void testSparkStatisticsToString() {
+    SparkStatisticsBuilder statsBuilder = new SparkStatisticsBuilder();
+    statsBuilder.add("TEST", "stat1", "1");
+    statsBuilder.add("TEST", "stat2", "1");
+    String statsString = SparkTask.sparkStatisticsToString(statsBuilder.build(), 10);
+
+    Assert.assertTrue(statsString.contains("10"));
+    Assert.assertTrue(statsString.contains("TEST"));
+    Assert.assertTrue(statsString.contains("stat1"));
+    Assert.assertTrue(statsString.contains("stat2"));
+    Assert.assertTrue(statsString.contains("1"));
+  }
 
   private boolean isEmptySparkWork(SparkWork sparkWork) {
     List<BaseWork> allWorks = sparkWork.getAllWork();

http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index 526aefd..2f3c026 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -141,7 +141,9 @@ public class MetricsCollection {
     try {
       // Task metrics.
       long executorDeserializeTime = 0L;
+      long executorDeserializeCpuTime = 0L;
       long executorRunTime = 0L;
+      long executorCpuTime = 0L;
       long resultSize = 0L;
       long jvmGCTime = 0L;
       long resultSerializationTime = 0L;
@@ -167,7 +169,9 @@ public class MetricsCollection {
       for (TaskInfo info : Collections2.filter(taskMetrics, filter)) {
         Metrics m = info.metrics;
         executorDeserializeTime += m.executorDeserializeTime;
+        executorDeserializeCpuTime += m.executorDeserializeCpuTime;
         executorRunTime += m.executorRunTime;
+        executorCpuTime += m.executorCpuTime;
         resultSize += m.resultSize;
         jvmGCTime += m.jvmGCTime;
         resultSerializationTime += m.resultSerializationTime;
@@ -217,7 +221,9 @@ public class MetricsCollection {
 
       return new Metrics(
         executorDeserializeTime,
+        executorDeserializeCpuTime,
         executorRunTime,
+        executorCpuTime,
         resultSize,
         jvmGCTime,
         resultSerializationTime,

http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index 9da0116..b718b3b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -18,6 +18,7 @@
 package org.apache.hive.spark.client.metrics;
 
 import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 
@@ -35,8 +36,12 @@ public class Metrics implements Serializable {
 
   /** Time taken on the executor to deserialize tasks. */
   public final long executorDeserializeTime;
+  /** CPU time taken on the executor to deserialize tasks. */
+  public final long executorDeserializeCpuTime;
   /** Time the executor spends actually running the task (including fetching shuffle data). */
   public final long executorRunTime;
+  /** CPU time the executor spends running the task (including fetching shuffle data). */
+  public final long executorCpuTime;
   /** The number of bytes sent back to the driver by tasks. */
   public final long resultSize;
   /** Amount of time the JVM spent in garbage collection while executing tasks. */
@@ -61,12 +66,14 @@ public class Metrics implements Serializable {
 
   private Metrics() {
     // For Serialization only.
-    this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
+    this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
   }
 
   public Metrics(
       long executorDeserializeTime,
+      long executorDeserializeCpuTime,
       long executorRunTime,
+      long executorCpuTime,
       long resultSize,
       long jvmGCTime,
       long resultSerializationTime,
@@ -77,7 +84,9 @@ public class Metrics implements Serializable {
       ShuffleReadMetrics shuffleReadMetrics,
       ShuffleWriteMetrics shuffleWriteMetrics) {
     this.executorDeserializeTime = executorDeserializeTime;
+    this.executorDeserializeCpuTime = executorDeserializeCpuTime;
     this.executorRunTime = executorRunTime;
+    this.executorCpuTime = executorCpuTime;
     this.resultSize = resultSize;
     this.jvmGCTime = jvmGCTime;
     this.resultSerializationTime = resultSerializationTime;
@@ -92,7 +101,9 @@ public class Metrics implements Serializable {
   public Metrics(TaskMetrics metrics, TaskInfo taskInfo) {
     this(
       metrics.executorDeserializeTime(),
+      TimeUnit.NANOSECONDS.toMillis(metrics.executorDeserializeCpuTime()),
       metrics.executorRunTime(),
+      TimeUnit.NANOSECONDS.toMillis(metrics.executorCpuTime()),
       metrics.resultSize(),
       metrics.jvmGCTime(),
       metrics.resultSerializationTime(),

http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 87b460d..c5884cf 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -66,8 +66,8 @@ public class TestMetricsCollection {
   @Test
   public void testOptionalMetrics() {
     long value = taskValue(1, 1, 1L);
-    Metrics metrics = new Metrics(value, value, value, value, value, value, value, value,
-        null, null, null);
+    Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, value,
+            value, null, null, null);
 
     MetricsCollection collection = new MetricsCollection();
     for (int i : Arrays.asList(1, 2)) {
@@ -94,10 +94,11 @@ public class TestMetricsCollection {
     MetricsCollection collection = new MetricsCollection();
 
     long value = taskValue(1, 1, 1);
-    Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value,
-            new InputMetrics(value), null, null);
-    Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value,
-            new InputMetrics(value), null, null);
+
+    Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value,
+            value, new InputMetrics(value), null, null);
+    Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value,
+            value, new InputMetrics(value), null, null);
 
     collection.addMetrics(1, 1, 1, metrics1);
     collection.addMetrics(1, 1, 2, metrics2);
@@ -108,7 +109,7 @@ public class TestMetricsCollection {
 
   private Metrics makeMetrics(int jobId, int stageId, long taskId) {
     long value = 1000000 * jobId + 1000 * stageId + taskId;
-    return new Metrics(value, value, value, value, value, value, value, value,
+    return new Metrics(value, value, value, value, value, value, value, value, value, value,
       new InputMetrics(value),
       new ShuffleReadMetrics((int) value, (int) value, value, value),
       new ShuffleWriteMetrics(value, value));
@@ -148,7 +149,9 @@ public class TestMetricsCollection {
 
   private void checkMetrics(Metrics metrics, long expected) {
     assertEquals(expected, metrics.executorDeserializeTime);
+    assertEquals(expected, metrics.executorDeserializeCpuTime);
     assertEquals(expected, metrics.executorRunTime);
+    assertEquals(expected, metrics.executorCpuTime);
     assertEquals(expected, metrics.resultSize);
     assertEquals(expected, metrics.jvmGCTime);
     assertEquals(expected, metrics.resultSerializationTime);