You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/07/23 14:13:26 UTC
hive git commit: HIVE-20056: SparkPartitionPruner shouldn't be
triggered by Spark tasks (Sahil Takiar, reviewed by Rui Li)
Repository: hive
Updated Branches:
refs/heads/master 6b1581623 -> bed17e54d
HIVE-20056: SparkPartitionPruner shouldn't be triggered by Spark tasks (Sahil Takiar, reviewed by Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bed17e54
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bed17e54
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bed17e54
Branch: refs/heads/master
Commit: bed17e54d8ab465256e41f2b796d69d21a075ea0
Parents: 6b15816
Author: Sahil Takiar <ta...@gmail.com>
Authored: Sat Jul 14 12:51:01 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Mon Jul 23 09:12:55 2018 -0500
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/log/PerfLogger.java | 2 ++
.../exec/spark/SparkDynamicPartitionPruner.java | 25 ++++++++++++--------
.../hive/ql/exec/spark/SparkPlanGenerator.java | 24 +++++++++++++++++++
.../hadoop/hive/ql/io/HiveInputFormat.java | 12 ----------
4 files changed, 41 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bed17e54/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 111e614..0ee41c0 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -85,6 +85,8 @@ public class PerfLogger {
public static final String SPARK_OPTIMIZE_OPERATOR_TREE = "SparkOptimizeOperatorTree";
public static final String SPARK_OPTIMIZE_TASK_TREE = "SparkOptimizeTaskTree";
public static final String SPARK_FLUSH_HASHTABLE = "SparkFlushHashTable.";
+ public static final String SPARK_DYNAMICALLY_PRUNE_PARTITIONS =
+ "SparkDynamicallyPrunePartitions.";
public static final String FILE_MOVES = "FileMoves";
public static final String LOAD_TABLE = "LoadTable";
http://git-wip-us.apache.org/repos/asf/hive/blob/bed17e54/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
index 240fa09..b9285ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
@@ -30,9 +30,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.clearspring.analytics.util.Preconditions;
-import javolution.testing.AssertionException;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -63,7 +64,11 @@ import org.apache.hadoop.util.ReflectionUtils;
* The spark version of DynamicPartitionPruner.
*/
public class SparkDynamicPartitionPruner {
+
private static final Logger LOG = LoggerFactory.getLogger(SparkDynamicPartitionPruner.class);
+ private static final String CLASS_NAME = SparkDynamicPartitionPruner.class.getName();
+
+ private final PerfLogger perfLogger = SessionState.getPerfLogger();
private final Map<String, List<SourceInfo>> sourceInfoMap = new LinkedHashMap<String, List<SourceInfo>>();
private final BytesWritable writable = new BytesWritable();
@@ -74,8 +79,12 @@ public class SparkDynamicPartitionPruner {
// Nothing to prune for this MapWork
return;
}
+ perfLogger.PerfLogBegin(CLASS_NAME,
+ PerfLogger.SPARK_DYNAMICALLY_PRUNE_PARTITIONS + work.getName());
processFiles(work, jobConf);
prunePartitions(work);
+ perfLogger.PerfLogBegin(CLASS_NAME,
+ PerfLogger.SPARK_DYNAMICALLY_PRUNE_PARTITIONS + work.getName());
}
public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
@@ -210,14 +219,11 @@ public class SparkDynamicPartitionPruner {
Path p = it.next();
PartitionDesc desc = work.getPathToPartitionInfo().get(p);
Map<String, String> spec = desc.getPartSpec();
- if (spec == null) {
- throw new AssertionException("No partition spec found in dynamic pruning");
- }
+ Preconditions.checkNotNull(spec, "No partition spec found in dynamic pruning");
String partValueString = spec.get(columnName);
- if (partValueString == null) {
- throw new AssertionException("Could not find partition value for column: " + columnName);
- }
+ Preconditions.checkNotNull(partValueString,
+ "Could not find partition value for column: " + columnName);
Object partValue = converter.convert(partValueString);
if (LOG.isDebugEnabled()) {
@@ -234,8 +240,7 @@ public class SparkDynamicPartitionPruner {
LOG.info("Pruning path: " + p);
it.remove();
work.removePathToAlias(p);
- // HIVE-12244 call currently ineffective
- work.getPartitionDescs().remove(desc);
+ work.removePathToPartitionInfo(p);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bed17e54/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index d71d705..001d0b0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -109,6 +109,9 @@ public class SparkPlanGenerator {
try {
for (BaseWork work : sparkWork.getAllWork()) {
+ // Run the SparkDynamicPartitionPruner, we run this here instead of inside the
+ // InputFormat so that we don't have to run pruning when creating a Record Reader
+ runDynamicPartitionPruner(work);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName());
SparkTran tran = generate(work, sparkWork);
SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work);
@@ -127,6 +130,27 @@ public class SparkPlanGenerator {
return sparkPlan;
}
+ /**
+ * Run a {@link SparkDynamicPartitionPruner} on the given {@link BaseWork}. This method only
+ * runs the pruner if the work object is a {@link MapWork}. We do this here because we need to
+ * do it after all previous Spark jobs for the given query have completed, otherwise the input
+ * file for the pruner won't exist. We need to make sure this runs before we serialize the
+ * given work object to a file (so it can be read by individual tasks) because the pruner will
+ * mutate the work work object by removing certain input paths.
+ *
+ * @param work the {@link BaseWork} to run the pruner on
+ */
+ private void runDynamicPartitionPruner(BaseWork work) {
+ if (work instanceof MapWork && HiveConf.isSparkDPPAny(jobConf)) {
+ SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner();
+ try {
+ pruner.prune((MapWork) work, jobConf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
// Generate (possibly get from a cached result) parent SparkTran
private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork,
BaseWork work) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/bed17e54/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 3cb7ab5..ea37daf 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -432,18 +432,6 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
} else {
mrwork = Utilities.getMapWork(job);
}
-
- // Prune partitions
- if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")
- && HiveConf.isSparkDPPAny(job)) {
- SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner();
- try {
- pruner.prune(mrwork, job);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
pathToPartitionInfo = mrwork.getPathToPartitionInfo();
}
}