You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/06/07 19:30:33 UTC
hive git commit: HIVE-18690: Integrate with Spark OutputMetrics
(Sahil Takiar, reviewed by Aihua Xu)
Repository: hive
Updated Branches:
refs/heads/master 473dd0462 -> f04eba3ca
HIVE-18690: Integrate with Spark OutputMetrics (Sahil Takiar, reviewed by Aihua Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f04eba3c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f04eba3c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f04eba3c
Branch: refs/heads/master
Commit: f04eba3cac3007bfe61ec2f57c92d404f4d40b4c
Parents: 473dd04
Author: Sahil Takiar <ta...@gmail.com>
Authored: Tue Feb 27 20:21:55 2018 -0800
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Thu Jun 7 14:29:59 2018 -0500
----------------------------------------------------------------------
.../hive/ql/exec/spark/TestSparkStatistics.java | 2 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 16 ++++-
.../hive/ql/exec/spark/SparkMetricUtils.java | 63 ++++++++++++++++++++
.../spark/Statistic/SparkStatisticsNames.java | 3 +
.../spark/status/impl/SparkMetricsUtils.java | 4 ++
.../hive/spark/client/MetricsCollection.java | 20 ++++++-
.../hive/spark/client/metrics/Metrics.java | 16 ++++-
.../spark/client/metrics/OutputMetrics.java | 57 ++++++++++++++++++
.../spark/client/TestMetricsCollection.java | 14 +++--
9 files changed, 183 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
index f6c5b17..d383873 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
@@ -81,7 +81,7 @@ public class TestSparkStatistics {
List<SparkStatistic> sparkStats = Lists.newArrayList(sparkTask.getSparkStatistics()
.getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics());
- Assert.assertEquals(24, sparkStats.size());
+ Assert.assertEquals(26, sparkStats.size());
Map<String, String> statsMap = sparkStats.stream().collect(
Collectors.toMap(SparkStatistic::getName, SparkStatistic::getValue));
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 83b53f4..c2319bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.function.BiFunction;
import com.google.common.collect.Lists;
@@ -37,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext;
+import org.apache.hadoop.hive.ql.exec.spark.SparkMetricUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -79,16 +82,18 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto
import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
import org.apache.hadoop.hive.shims.ShimLoader;
+
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
+
import org.apache.hive.common.util.HiveStringUtils;
-import org.apache.hive.common.util.Murmur3;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.function.BiFunction;
+
/**
* File Sink operator implementation.
**/
@@ -1228,6 +1233,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
row_count.set(numRows);
LOG.info(toString() + ": records written - " + numRows);
+ if ("spark".equalsIgnoreCase(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
+ SparkMetricUtils.updateSparkRecordsWrittenMetrics(runTimeNumRows);
+ }
+
if (!bDynParts && !filesCreated) {
boolean skipFiles = "tez".equalsIgnoreCase(
HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE));
@@ -1303,6 +1312,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
if (isNativeTable()) {
fsp.commit(fs, commitPaths);
}
+ if ("spark".equals(HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE))) {
+ SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths);
+ }
}
if (conf.isMmTable()) {
Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId,
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java
new file mode 100644
index 0000000..1f856ae
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.TaskContext;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Utility class for update Spark-level metrics.
+ */
+public final class SparkMetricUtils {
+
+ private SparkMetricUtils() {
+ // Do nothing
+ }
+
+ public static void updateSparkRecordsWrittenMetrics(long numRows) {
+ TaskContext taskContext = TaskContext.get();
+ if (taskContext != null && numRows > 0) {
+ taskContext.taskMetrics().outputMetrics().setRecordsWritten(numRows);
+ }
+ }
+
+ public static void updateSparkBytesWrittenMetrics(Logger log, FileSystem fs, Path[]
+ commitPaths) {
+ AtomicLong bytesWritten = new AtomicLong();
+ Arrays.stream(commitPaths).parallel().forEach(path -> {
+ try {
+ bytesWritten.addAndGet(fs.getFileStatus(path).getLen());
+ } catch (IOException e) {
+ log.debug("Unable to collect stats for file: " + path + " output metrics may be inaccurate",
+ e);
+ }
+ });
+ if (bytesWritten.get() > 0) {
+ TaskContext.get().taskMetrics().outputMetrics().setBytesWritten(bytesWritten.get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
index 12c3eac..b80a8f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
@@ -54,5 +54,8 @@ public class SparkStatisticsNames {
public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime";
public static final String SHUFFLE_RECORDS_WRITTEN = "ShuffleRecordsWritten";
+ public static final String RECORDS_WRITTEN = "RecordsWritten";
+ public static final String BYTES_WRITTEN = "BytesWritten";
+
public static final String SPARK_GROUP_NAME = "SPARK";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
index c73c150..a0a0330 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
@@ -68,6 +68,10 @@ public final class SparkMetricsUtils {
allMetrics.shuffleWriteMetrics.shuffleRecordsWritten);
results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime);
}
+ if (allMetrics.outputMetrics != null) {
+ results.put(SparkStatisticsNames.BYTES_WRITTEN, allMetrics.outputMetrics.bytesWritten);
+ results.put(SparkStatisticsNames.RECORDS_WRITTEN, allMetrics.outputMetrics.recordsWritten);
+ }
return results;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index a0db015..2be19de 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hive.spark.client.metrics.InputMetrics;
import org.apache.hive.spark.client.metrics.Metrics;
+import org.apache.hive.spark.client.metrics.OutputMetrics;
import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics;
@@ -171,6 +172,11 @@ public class MetricsCollection {
long shuffleWriteTime = 0L;
long shuffleRecordsWritten = 0L;
+ // Input metrics.
+ boolean hasOuputMetrics = false;
+ long bytesWritten = 0L;
+ long recordsWritten = 0L;
+
for (TaskInfo info : Collections2.filter(taskMetrics, filter)) {
Metrics m = info.metrics;
executorDeserializeTime += m.executorDeserializeTime;
@@ -206,6 +212,12 @@ public class MetricsCollection {
shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime;
shuffleRecordsWritten += m.shuffleWriteMetrics.shuffleRecordsWritten;
}
+
+ if (m.outputMetrics != null) {
+ hasOuputMetrics = true;
+ bytesWritten += m.outputMetrics.bytesWritten;
+ recordsWritten += m.outputMetrics.recordsWritten;
+ }
}
InputMetrics inputMetrics = null;
@@ -233,6 +245,11 @@ public class MetricsCollection {
shuffleRecordsWritten);
}
+ OutputMetrics outputMetrics = null;
+ if (hasInputMetrics) {
+ outputMetrics = new OutputMetrics(bytesWritten, recordsWritten);
+ }
+
return new Metrics(
executorDeserializeTime,
executorDeserializeCpuTime,
@@ -246,7 +263,8 @@ public class MetricsCollection {
taskDurationTime,
inputMetrics,
shuffleReadMetrics,
- shuffleWriteMetrics);
+ shuffleWriteMetrics,
+ outputMetrics);
} finally {
lock.readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index cf7a1f6..e09effc 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -63,10 +63,12 @@ public class Metrics implements Serializable {
public final ShuffleReadMetrics shuffleReadMetrics;
/** If tasks wrote to shuffle output, metrics on the written shuffle data. */
public final ShuffleWriteMetrics shuffleWriteMetrics;
+ /** A collection of accumulators that represents metrics about writing data to external systems. */
+ public final OutputMetrics outputMetrics;
private Metrics() {
// For Serialization only.
- this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
+ this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null, null);
}
public Metrics(
@@ -82,7 +84,8 @@ public class Metrics implements Serializable {
long taskDurationTime,
InputMetrics inputMetrics,
ShuffleReadMetrics shuffleReadMetrics,
- ShuffleWriteMetrics shuffleWriteMetrics) {
+ ShuffleWriteMetrics shuffleWriteMetrics,
+ OutputMetrics outputMetrics) {
this.executorDeserializeTime = executorDeserializeTime;
this.executorDeserializeCpuTime = executorDeserializeCpuTime;
this.executorRunTime = executorRunTime;
@@ -96,6 +99,7 @@ public class Metrics implements Serializable {
this.inputMetrics = inputMetrics;
this.shuffleReadMetrics = shuffleReadMetrics;
this.shuffleWriteMetrics = shuffleWriteMetrics;
+ this.outputMetrics = outputMetrics;
}
public Metrics(TaskMetrics metrics, TaskInfo taskInfo) {
@@ -112,7 +116,8 @@ public class Metrics implements Serializable {
taskInfo.duration(),
optionalInputMetric(metrics),
optionalShuffleReadMetric(metrics),
- optionalShuffleWriteMetrics(metrics));
+ optionalShuffleWriteMetrics(metrics),
+ optionalOutputMetrics(metrics));
}
private static InputMetrics optionalInputMetric(TaskMetrics metrics) {
@@ -127,6 +132,10 @@ public class Metrics implements Serializable {
return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null;
}
+ private static OutputMetrics optionalOutputMetrics(TaskMetrics metrics) {
+ return (metrics.outputMetrics() != null) ? new OutputMetrics(metrics) : null;
+ }
+
@Override
public String toString() {
return "Metrics{" +
@@ -143,6 +152,7 @@ public class Metrics implements Serializable {
", inputMetrics=" + inputMetrics +
", shuffleReadMetrics=" + shuffleReadMetrics +
", shuffleWriteMetrics=" + shuffleWriteMetrics +
+ ", outputMetrics=" + outputMetrics +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java
new file mode 100644
index 0000000..99516ff
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.spark.executor.TaskMetrics;
+
+import java.io.Serializable;
+
+/**
+ * Metrics pertaining to writing data.
+ */
+@InterfaceAudience.Private
+public class OutputMetrics implements Serializable {
+
+ /** Total number of bytes written. */
+ public final long bytesWritten;
+ /** Total number of records written. */
+ public final long recordsWritten;
+
+ private OutputMetrics() {
+ // For Serialization only.
+ this(0L, 0L);
+ }
+
+ public OutputMetrics(long bytesWritten, long recordsWritten) {
+ this.bytesWritten = bytesWritten;
+ this.recordsWritten = recordsWritten;
+ }
+
+ public OutputMetrics(TaskMetrics metrics) {
+ this(metrics.outputMetrics().bytesWritten(), metrics.outputMetrics().recordsWritten());
+ }
+
+ @Override
+ public String toString() {
+ return "OutputMetrics{" +
+ "bytesWritten=" + bytesWritten +
+ ", recordsWritten=" + recordsWritten +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 2d4c43d..a5c13ca 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import org.apache.hive.spark.client.metrics.DataReadMethod;
import org.apache.hive.spark.client.metrics.InputMetrics;
import org.apache.hive.spark.client.metrics.Metrics;
+import org.apache.hive.spark.client.metrics.OutputMetrics;
import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics;
import org.junit.Test;
@@ -67,7 +68,7 @@ public class TestMetricsCollection {
public void testOptionalMetrics() {
long value = taskValue(1, 1, 1L);
Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, value,
- value, null, null, null);
+ value, null, null, null, null);
MetricsCollection collection = new MetricsCollection();
for (int i : Arrays.asList(1, 2)) {
@@ -96,9 +97,9 @@ public class TestMetricsCollection {
long value = taskValue(1, 1, 1);
Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value,
- value, new InputMetrics(value, value), null, null);
+ value, new InputMetrics(value, value), null, null, null);
Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value,
- value, new InputMetrics(value, value), null, null);
+ value, new InputMetrics(value, value), null, null, null);
collection.addMetrics(1, 1, 1, metrics1);
collection.addMetrics(1, 1, 2, metrics2);
@@ -112,7 +113,8 @@ public class TestMetricsCollection {
return new Metrics(value, value, value, value, value, value, value, value, value, value,
new InputMetrics(value, value),
new ShuffleReadMetrics((int) value, (int) value, value, value, value, value, value),
- new ShuffleWriteMetrics(value, value, value));
+ new ShuffleWriteMetrics(value, value, value),
+ new OutputMetrics(value, value));
}
/**
@@ -173,6 +175,8 @@ public class TestMetricsCollection {
assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten);
assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime);
assertEquals(expected, metrics.shuffleWriteMetrics.shuffleRecordsWritten);
- }
+ assertEquals(expected, metrics.outputMetrics.recordsWritten);
+ assertEquals(expected, metrics.outputMetrics.bytesWritten);
+ }
}