You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/08/02 05:59:10 UTC
[25/50] hive git commit: HIVE-9152 - Dynamic Partition Pruning [Spark
Branch] (Chao Sun, reviewed by Xuefu Zhang and Chengxiang Li)
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/gen/thrift/gen-rb/queryplan_constants.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_constants.rb b/ql/src/gen/thrift/gen-rb/queryplan_constants.rb
index 8080b3f..428185e 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_constants.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_constants.rb
@@ -1,5 +1,5 @@
#
-# Autogenerated by Thrift Compiler (0.9.0)
+# Autogenerated by Thrift Compiler (0.9.2)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index c2c4220..1a22f07 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -1,5 +1,5 @@
#
-# Autogenerated by Thrift Compiler (0.9.0)
+# Autogenerated by Thrift Compiler (0.9.2)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
@@ -46,8 +46,9 @@ module OperatorType
ORCFILEMERGE = 22
RCFILEMERGE = 23
MERGEJOIN = 24
- VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE", 24 => "MERGEJOIN"}
- VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE, MERGEJOIN]).freeze
+ SPARKPRUNINGSINK = 25
+ VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE", 24 => "MERGEJOIN", 25 => "SPARKPRUNINGSINK"}
+ VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE, MERGEJOIN, SPARKPRUNINGSINK]).freeze
end
module TaskType
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index f58a10b..ff58741 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -34,8 +34,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
@@ -117,6 +120,8 @@ public final class OperatorFactory {
AppMasterEventOperator.class));
opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
AppMasterEventOperator.class));
+ opvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(SparkPartitionPruningSinkDesc.class,
+ SparkPartitionPruningSinkOperator.class));
opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class,
RCFileMergeOperator.class));
opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
@@ -133,6 +138,9 @@ public final class OperatorFactory {
VectorAppMasterEventOperator.class));
vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
VectorAppMasterEventOperator.class));
+ vectorOpvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(
+ SparkPartitionPruningSinkDesc.class,
+ VectorSparkPartitionPruningSinkOperator.class));
vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class));
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 21398d8..007db75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
@@ -148,11 +149,11 @@ public class HiveSparkClientFactory {
Set<String> classes = Sets.newHashSet(
Splitter.on(",").trimResults().omitEmptyStrings().split(
Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister"))));
+ classes.add(Writable.class.getName());
classes.add(VectorizedRowBatch.class.getName());
classes.add(BytesWritable.class.getName());
classes.add(HiveKey.class.getName());
- sparkConf.put(
- "spark.kryo.classesToRegister", Joiner.on(",").join(classes));
+ sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes));
return sparkConf;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
new file mode 100644
index 0000000..52913e0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.clearspring.analytics.util.Preconditions;
+import javolution.testing.AssertionException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The spark version of DynamicPartitionPruner.
+ */
+public class SparkDynamicPartitionPruner {
+ private static final Log LOG = LogFactory.getLog(SparkDynamicPartitionPruner.class);
+ private final Map<String, List<SourceInfo>> sourceInfoMap = new LinkedHashMap<String, List<SourceInfo>>();
+ private final BytesWritable writable = new BytesWritable();
+
+ public void prune(MapWork work, JobConf jobConf) throws HiveException, SerDeException {
+ sourceInfoMap.clear();
+ initialize(work, jobConf);
+ if (sourceInfoMap.size() == 0) {
+ // Nothing to prune for this MapWork
+ return;
+ }
+ processFiles(work, jobConf);
+ prunePartitions(work);
+ }
+
+ public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+ Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
+ Set<String> sourceWorkIds = work.getEventSourceTableDescMap().keySet();
+
+ for (String id : sourceWorkIds) {
+ List<TableDesc> tables = work.getEventSourceTableDescMap().get(id);
+ List<String> columnNames = work.getEventSourceColumnNameMap().get(id);
+ List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(id);
+
+ Iterator<String> cit = columnNames.iterator();
+ Iterator<ExprNodeDesc> pit = partKeyExprs.iterator();
+ for (TableDesc t : tables) {
+ String columnName = cit.next();
+ ExprNodeDesc partKeyExpr = pit.next();
+ SourceInfo si = new SourceInfo(t, partKeyExpr, columnName, jobConf);
+ if (!sourceInfoMap.containsKey(id)) {
+ sourceInfoMap.put(id, new ArrayList<SourceInfo>());
+ }
+ sourceInfoMap.get(id).add(si);
+
+ // We could have multiple sources restrict the same column, need to take
+ // the union of the values in that case.
+ if (columnMap.containsKey(columnName)) {
+ si.values = columnMap.get(columnName).values;
+ }
+ columnMap.put(columnName, si);
+ }
+ }
+ }
+
+ private void processFiles(MapWork work, JobConf jobConf) throws HiveException {
+ ObjectInputStream in = null;
+ try {
+ Path baseDir = work.getTmpPathForPartitionPruning();
+ FileSystem fs = FileSystem.get(baseDir.toUri(), jobConf);
+
+ // Find the SourceInfo to put values in.
+ for (String name : sourceInfoMap.keySet()) {
+ Path sourceDir = new Path(baseDir, name);
+ for (FileStatus fstatus : fs.listStatus(sourceDir)) {
+ LOG.info("Start processing pruning file: " + fstatus.getPath());
+ in = new ObjectInputStream(fs.open(fstatus.getPath()));
+ String columnName = in.readUTF();
+ SourceInfo info = null;
+
+ for (SourceInfo si : sourceInfoMap.get(name)) {
+ if (columnName.equals(si.columnName)) {
+ info = si;
+ break;
+ }
+ }
+
+ Preconditions.checkArgument(info != null,
+ "AssertionError: no source info for the column: " + columnName);
+
+ // Read fields
+ while (in.available() > 0) {
+ writable.readFields(in);
+
+ Object row = info.deserializer.deserialize(writable);
+ Object value = info.soi.getStructFieldData(row, info.field);
+ value = ObjectInspectorUtils.copyToStandardObject(value, info.fieldInspector);
+ info.values.add(value);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ } finally {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException e) {
+ throw new HiveException("error while trying to close input stream", e);
+ }
+ }
+ }
+
+ private void prunePartitions(MapWork work) throws HiveException {
+ for (String source : sourceInfoMap.keySet()) {
+ for (SourceInfo info : sourceInfoMap.get(source)) {
+ prunePartitionSingleSource(info, work);
+ }
+ }
+ }
+
+ private void prunePartitionSingleSource(SourceInfo info, MapWork work)
+ throws HiveException {
+ Set<Object> values = info.values;
+ String columnName = info.columnName;
+
+ ObjectInspector oi =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory
+ .getPrimitiveTypeInfo(info.fieldInspector.getTypeName()));
+
+ ObjectInspectorConverters.Converter converter =
+ ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi);
+
+ StructObjectInspector soi =
+ ObjectInspectorFactory.getStandardStructObjectInspector(
+ Collections.singletonList(columnName), Collections.singletonList(oi));
+
+ @SuppressWarnings("rawtypes")
+ ExprNodeEvaluator eval = ExprNodeEvaluatorFactory.get(info.partKey);
+ eval.initialize(soi);
+
+ applyFilterToPartitions(work, converter, eval, columnName, values);
+ }
+
+ private void applyFilterToPartitions(
+ MapWork work,
+ ObjectInspectorConverters.Converter converter,
+ ExprNodeEvaluator eval,
+ String columnName,
+ Set<Object> values) throws HiveException {
+
+ Object[] row = new Object[1];
+
+ Iterator<String> it = work.getPathToPartitionInfo().keySet().iterator();
+ while (it.hasNext()) {
+ String p = it.next();
+ PartitionDesc desc = work.getPathToPartitionInfo().get(p);
+ Map<String, String> spec = desc.getPartSpec();
+ if (spec == null) {
+ throw new AssertionException("No partition spec found in dynamic pruning");
+ }
+
+ String partValueString = spec.get(columnName);
+ if (partValueString == null) {
+ throw new AssertionException("Could not find partition value for column: " + columnName);
+ }
+
+ Object partValue = converter.convert(partValueString);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converted partition value: " + partValue + " original (" + partValueString + ")");
+ }
+
+ row[0] = partValue;
+ partValue = eval.evaluate(row);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("part key expr applied: " + partValue);
+ }
+
+ if (!values.contains(partValue)) {
+ LOG.info("Pruning path: " + p);
+ it.remove();
+ work.getPathToAliases().remove(p);
+ work.getPaths().remove(p);
+ work.getPartitionDescs().remove(desc);
+ }
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private static class SourceInfo {
+ final ExprNodeDesc partKey;
+ final Deserializer deserializer;
+ final StructObjectInspector soi;
+ final StructField field;
+ final ObjectInspector fieldInspector;
+ Set<Object> values = new HashSet<Object>();
+ final String columnName;
+
+ SourceInfo(TableDesc table, ExprNodeDesc partKey, String columnName, JobConf jobConf)
+ throws SerDeException {
+ this.partKey = partKey;
+ this.columnName = columnName;
+
+ deserializer = ReflectionUtils.newInstance(table.getDeserializerClass(), null);
+ deserializer.initialize(jobConf, table.getProperties());
+
+ ObjectInspector inspector = deserializer.getObjectInspector();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Type of obj insp: " + inspector.getTypeName());
+ }
+
+ soi = (StructObjectInspector) inspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ assert(fields.size() > 1) : "expecting single field in input";
+
+ field = fields.get(0);
+ fieldInspector =
+ ObjectInspectorUtils.getStandardObjectInspector(field.getFieldObjectInspector());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index ca0ffb6..cf2c3bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -23,16 +23,22 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.UUID;
+import java.util.Collection;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.Dependency;
@@ -158,4 +164,54 @@ public class SparkUtilities {
}
}
}
+
+ /**
+ * Generate a temporary path for dynamic partition pruning in Spark branch
+ * TODO: no longer need this if we use accumulator!
+ * @param basePath
+ * @param id
+ * @return
+ */
+ public static Path generateTmpPathForPartitionPruning(Path basePath, String id) {
+ return new Path(basePath, id);
+ }
+
+ /**
+ * Return the ID for this BaseWork, in String form.
+ * @param work the input BaseWork
+ * @return the unique ID for this BaseWork
+ */
+ public static String getWorkId(BaseWork work) {
+ String workName = work.getName();
+ return workName.substring(workName.indexOf(" ") + 1);
+ }
+
+ public static SparkTask createSparkTask(HiveConf conf) {
+ return (SparkTask) TaskFactory.get(
+ new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+ }
+
+ public static SparkTask createSparkTask(SparkWork work, HiveConf conf) {
+ return (SparkTask) TaskFactory.get(work, conf);
+ }
+
+ /**
+ * Recursively find all operators under root, that are of class clazz, and
+ * put them in result.
+ * @param result all operators under root that are of class clazz
+ * @param root the root operator under which all operators will be examined
+ * @param clazz clas to collect. Must NOT be null.
+ */
+ public static void collectOp(Collection<Operator<?>> result, Operator<?> root, Class<?> clazz) {
+ Preconditions.checkArgument(clazz != null, "AssertionError: clazz should not be null");
+ if (root == null) {
+ return;
+ }
+ if (clazz.equals(root.getClass())) {
+ result.add(root);
+ }
+ for (Operator<?> child : root.getChildOperators()) {
+ collectOp(result, child, clazz);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
new file mode 100644
index 0000000..3bce49d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vectorized version for SparkPartitionPruningSinkOperator.
+ * Forked from VectorAppMasterEventOperator.
+ **/
+public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruningSinkOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorizationContext vContext;
+
+ protected transient boolean firstBatch;
+
+ protected transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+
+ protected transient Object[] singleRow;
+
+ public VectorSparkPartitionPruningSinkOperator(VectorizationContext context,
+ OperatorDesc conf) {
+ super();
+ this.conf = (SparkPartitionPruningSinkDesc) conf;
+ this.vContext = context;
+ }
+
+ public VectorSparkPartitionPruningSinkOperator() {
+ }
+
+ @Override
+ public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+ inputObjInspectors[0] =
+ VectorizedBatchUtil.convertToStandardStructObjectInspector(
+ (StructObjectInspector) inputObjInspectors[0]);
+ Collection<Future<?>> result = super.initializeOp(hconf);
+ assert result.isEmpty();
+
+ firstBatch = true;
+
+ return result;
+ }
+
+ @Override
+ public void process(Object data, int tag) throws HiveException {
+ VectorizedRowBatch batch = (VectorizedRowBatch) data;
+ if (firstBatch) {
+ vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
+ vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0],
+ vContext.getProjectedColumns());
+ singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+ firstBatch = false;
+ }
+
+ vectorExtractRowDynBatch.setBatchOnEntry(batch);
+ ObjectInspector rowInspector = inputObjInspectors[0];
+ try {
+ Writable writableRow;
+ for (int logical = 0; logical < batch.size; logical++) {
+ int batchIndex = batch.selectedInUse ? batch.selected[logical] : logical;
+ vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ writableRow = serializer.serialize(singleRow, rowInspector);
+ writableRow.write(buffer);
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+
+ vectorExtractRowDynBatch.forgetBatchOnExit();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index 1de7e40..e13c4dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -514,34 +515,40 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
(int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD));
int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads);
- LOG.info("Total number of paths: " + paths.length +
- ", launching " + numThreads + " threads to check non-combinable ones.");
- ExecutorService executor = Executors.newFixedThreadPool(numThreads);
- List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
- try {
- for (int i = 0; i < numThreads; i++) {
- int start = i * numPathPerThread;
- int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
- futureList.add(executor.submit(
- new CheckNonCombinablePathCallable(paths, start, length, job)));
- }
- Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
- for (Future<Set<Integer>> future : futureList) {
- nonCombinablePathIndices.addAll(future.get());
- }
- for (int i = 0; i < paths.length; i++) {
- if (nonCombinablePathIndices.contains(i)) {
- nonCombinablePaths.add(paths[i]);
- } else {
- combinablePaths.add(paths[i]);
+
+ // This check is necessary because for Spark branch, the result array from
+ // getInputPaths() above could be empty, and therefore numThreads could be 0.
+ // In that case, Executors.newFixedThreadPool will fail.
+ if (numThreads > 0) {
+ LOG.info("Total number of paths: " + paths.length +
+ ", launching " + numThreads + " threads to check non-combinable ones.");
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
+ try {
+ for (int i = 0; i < numThreads; i++) {
+ int start = i * numPathPerThread;
+ int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
+ futureList.add(executor.submit(
+ new CheckNonCombinablePathCallable(paths, start, length, job)));
+ }
+ Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+ for (Future<Set<Integer>> future : futureList) {
+ nonCombinablePathIndices.addAll(future.get());
+ }
+ for (int i = 0; i < paths.length; i++) {
+ if (nonCombinablePathIndices.contains(i)) {
+ nonCombinablePaths.add(paths[i]);
+ } else {
+ combinablePaths.add(paths[i]);
+ }
}
+ } catch (Exception e) {
+ LOG.error("Error checking non-combinable path", e);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+ throw new IOException(e);
+ } finally {
+ executor.shutdownNow();
}
- } catch (Exception e) {
- LOG.error("Error checking non-combinable path", e);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
- throw new IOException(e);
- } finally {
- executor.shutdownNow();
}
// Store the previous value for the path specification
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 54e1d4e..27499ad 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map.Entry;
@@ -37,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -266,6 +268,18 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
} else {
mrwork = Utilities.getMapWork(job);
}
+
+ // Prune partitions
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")
+ && HiveConf.getBoolVar(job, HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+ SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner();
+ try {
+ pruner.prune(mrwork, job);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
pathToPartitionInfo = mrwork.getPathToPartitionInfo();
}
}
@@ -309,18 +323,28 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
Path[] getInputPaths(JobConf job) throws IOException {
- Path[] dirs = FileInputFormat.getInputPaths(job);
- if (dirs.length == 0) {
- // on tez we're avoiding to duplicate the file info in FileInputFormat.
- if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
- try {
- List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
- dirs = paths.toArray(new Path[paths.size()]);
- } catch (Exception e) {
- throw new IOException("Could not create input files", e);
+ Path[] dirs;
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ Set<String> pathStrings = mrwork.getPathToPartitionInfo().keySet();
+ dirs = new Path[pathStrings.size()];
+ Iterator<String> it = pathStrings.iterator();
+ for (int i = 0; i < dirs.length; i++) {
+ dirs[i] = new Path(it.next());
+ }
+ } else {
+ dirs = FileInputFormat.getInputPaths(job);
+ if (dirs.length == 0) {
+ // on tez we're avoiding to duplicate the file info in FileInputFormat.
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ try {
+ List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
+ dirs = paths.toArray(new Path[paths.size()]);
+ } catch (Exception e) {
+ throw new IOException("Could not create input files", e);
+ }
+ } else {
+ throw new IOException("No input paths specified in job");
}
- } else {
- throw new IOException("No input paths specified in job");
}
}
return dirs;
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index 8546d21..f475926 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -48,10 +48,12 @@ import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -152,15 +154,24 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
throws SemanticException {
- OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
- ParseContext parseContext = context.parseContext;
+ ParseContext parseContext;
+ if (procCtx instanceof OptimizeTezProcContext) {
+ parseContext = ((OptimizeTezProcContext) procCtx).parseContext;
+ } else if (procCtx instanceof OptimizeSparkProcContext) {
+ parseContext = ((OptimizeSparkProcContext) procCtx).getParseContext();
+ } else {
+ throw new IllegalArgumentException("expected parseContext to be either " +
+ "OptimizeTezProcContext or OptimizeSparkProcContext, but found " +
+ procCtx.getClass().getName());
+ }
FilterOperator filter = (FilterOperator) nd;
FilterDesc desc = filter.getConf();
TableScanOperator ts = null;
- if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+ if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING) &&
+ !parseContext.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
// nothing to do when the optimization is off
return null;
}
@@ -311,14 +322,25 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
groupByOp.setColumnExprMap(colMap);
// finally add the event broadcast operator
- DynamicPruningEventDesc eventDesc = new DynamicPruningEventDesc();
- eventDesc.setTableScan(ts);
- eventDesc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(keyExprs, "key")));
- eventDesc.setTargetColumnName(column);
- eventDesc.setPartKey(partKey);
-
- OperatorFactory.getAndMakeChild(eventDesc, groupByOp);
+ if (HiveConf.getVar(parseContext.getConf(),
+ ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ DynamicPruningEventDesc eventDesc = new DynamicPruningEventDesc();
+ eventDesc.setTableScan(ts);
+ eventDesc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils
+ .getFieldSchemasFromColumnList(keyExprs, "key")));
+ eventDesc.setTargetColumnName(column);
+ eventDesc.setPartKey(partKey);
+ OperatorFactory.getAndMakeChild(eventDesc, groupByOp);
+ } else {
+ // Must be spark branch
+ SparkPartitionPruningSinkDesc desc = new SparkPartitionPruningSinkDesc();
+ desc.setTableScan(ts);
+ desc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils
+ .getFieldSchemasFromColumnList(keyExprs, "key")));
+ desc.setTargetColumnName(column);
+ desc.setPartKey(partKey);
+ OperatorFactory.getAndMakeChild(desc, groupByOp);
+ }
}
private Map<Node, Object> walkExprTree(ExprNodeDesc pred, NodeProcessorCtx ctx)
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index a7cf8b7..e6db133 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -167,7 +167,7 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
transformations.add(new StatsOptimizer());
}
- if (isSparkExecEngine || (pctx.getContext().getExplain() && !isTezExecEngine)) {
+ if (pctx.getContext().getExplain() && !isTezExecEngine && !isSparkExecEngine) {
transformations.add(new AnnotateWithStatistics());
transformations.add(new AnnotateWithOpTraits());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
new file mode 100644
index 0000000..3742857
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+
+/**
+ * If we expect the number of keys for dynamic pruning to be too large we
+ * disable it.
+ *
+ * Cloned from RemoveDynamicPruningBySize
+ */
+public class SparkRemoveDynamicPruningBySize implements NodeProcessor {
+
+ static final private Log LOG = LogFactory.getLog(SparkRemoveDynamicPruningBySize.class.getName());
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext,
+ Object... nodeOutputs)
+ throws SemanticException {
+
+ OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext;
+
+ SparkPartitionPruningSinkOperator op = (SparkPartitionPruningSinkOperator) nd;
+ SparkPartitionPruningSinkDesc desc = op.getConf();
+
+ if (desc.getStatistics().getDataSize() > context.getConf()
+ .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
+ Operator<?> child = op;
+ Operator<?> curr = op;
+
+ while (curr.getChildOperators().size() <= 1) {
+ child = curr;
+ curr = curr.getParentOperators().get(0);
+ }
+
+ curr.removeChild(child);
+ // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
+ LOG.info("Disabling dynamic pruning for: "
+ + desc.getTableScan().getName()
+ + ". Expected data size is too big: " + desc.getStatistics().getDataSize());
+ }
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index ad47547..180513e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -919,6 +919,7 @@ public class Vectorizer implements PhysicalPlanResolver {
case FILESINK:
case LIMIT:
case EVENT:
+ case SPARKPRUNINGSINK:
ret = true;
break;
case HASHTABLESINK:
@@ -965,6 +966,7 @@ public class Vectorizer implements PhysicalPlanResolver {
break;
case LIMIT:
case EVENT:
+ case SPARKPRUNINGSINK:
ret = true;
break;
case HASHTABLESINK:
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
new file mode 100644
index 0000000..cc78227
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+@Explain(displayName = "Spark Partition Pruning Sink Operator")
+public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc {
+
+ // column in the target table that will be pruned against
+ private String targetColumnName;
+
+ private TableDesc table;
+
+ private transient TableScanOperator tableScan;
+
+ // the partition column we're interested in
+ private ExprNodeDesc partKey;
+
+ private Path path;
+
+ private String targetWork;
+
+ @Explain(displayName = "tmp Path", explainLevels = { Explain.Level.EXTENDED })
+ public Path getPath() {
+ return path;
+ }
+
+ public void setPath(Path path) {
+ this.path = path;
+ }
+
+ @Explain(displayName = "target work")
+ public String getTargetWork() {
+ return this.targetWork;
+ }
+
+ public void setTargetWork(String targetWork) {
+ this.targetWork = targetWork;
+ }
+
+ public TableScanOperator getTableScan() {
+ return tableScan;
+ }
+
+ public void setTableScan(TableScanOperator tableScan) {
+ this.tableScan = tableScan;
+ }
+
+ @Explain(displayName = "target column name")
+ public String getTargetColumnName() {
+ return targetColumnName;
+ }
+
+ public void setTargetColumnName(String targetColumnName) {
+ this.targetColumnName = targetColumnName;
+ }
+
+ public ExprNodeDesc getPartKey() {
+ return partKey;
+ }
+
+ public void setPartKey(ExprNodeDesc partKey) {
+ this.partKey = partKey;
+ }
+
+ public TableDesc getTable() {
+ return table;
+ }
+
+ public void setTable(TableDesc table) {
+ this.table = table;
+ }
+
+ @Explain(displayName = "partition key expr")
+ public String getPartKeyString() {
+ return partKey.getExprString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
index 447f104..0a0c791 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -40,7 +41,6 @@ import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
import java.io.Serializable;
import java.util.HashMap;
@@ -138,6 +138,13 @@ public class GenSparkProcContext implements NodeProcessorCtx {
// This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias.
public final Map<String, Operator<? extends OperatorDesc>> topOps;
+ // The set of pruning sinks
+ public final Set<Operator<?>> pruningSinkSet;
+
+ // The set of TableScanOperators for pruning OP trees
+ public final Set<Operator<?>> clonedPruningTableScanSet;
+
+
@SuppressWarnings("unchecked")
public GenSparkProcContext(HiveConf conf,
ParseContext parseContext,
@@ -153,8 +160,7 @@ public class GenSparkProcContext implements NodeProcessorCtx {
this.inputs = inputs;
this.outputs = outputs;
this.topOps = topOps;
- this.currentTask = (SparkTask) TaskFactory.get(
- new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+ this.currentTask = SparkUtilities.createSparkTask(conf);
this.rootTasks.add(currentTask);
this.leafOpToFollowingWorkInfo =
new LinkedHashMap<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, ReduceWork>>();
@@ -177,5 +183,7 @@ public class GenSparkProcContext implements NodeProcessorCtx {
this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
this.fileSinkMap = new LinkedHashMap<FileSinkOperator, List<FileSinkOperator>>();
+ this.pruningSinkSet = new LinkedHashSet<Operator<?>>();
+ this.clonedPruningTableScanSet = new LinkedHashSet<Operator<?>>();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 7992c88..1c0b79d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -46,7 +46,9 @@ import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
/**
* GenSparkUtils is a collection of shared helper methods to produce SparkWork
@@ -132,7 +135,7 @@ public class GenSparkUtils {
// remember which parent belongs to which tag
reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
- context.preceedingWork.getName());
+ context.preceedingWork.getName());
// remember the output name of the reduce sink
reduceSink.getConf().setOutputName(reduceWork.getName());
@@ -218,6 +221,16 @@ public class GenSparkUtils {
Iterator<Operator<?>> newOpQueueIt = newOpQueue.iterator();
for (Operator<?> op : opQueue) {
Operator<?> newOp = newOpQueueIt.next();
+
+ // We need to update rootToWorkMap in case the op is a key, since even
+ // though we clone the op tree, we're still using the same MapWork/ReduceWork.
+ if (context.rootToWorkMap.containsKey(op)) {
+ context.rootToWorkMap.put(newOp, context.rootToWorkMap.get(op));
+ }
+ // Don't remove the old entry - in SparkPartitionPruningSink it still
+ // refers to the old TS, and we need to lookup it later in
+ // processPartitionPruningSink.
+
if (op instanceof FileSinkOperator) {
List<FileSinkOperator> fileSinkList = context.fileSinkMap.get(op);
if (fileSinkList == null) {
@@ -225,6 +238,12 @@ public class GenSparkUtils {
}
fileSinkList.add((FileSinkOperator) newOp);
context.fileSinkMap.put((FileSinkOperator) op, fileSinkList);
+ } else if (op instanceof SparkPartitionPruningSinkOperator) {
+ SparkPartitionPruningSinkOperator oldPruningSink = (SparkPartitionPruningSinkOperator) op;
+ SparkPartitionPruningSinkOperator newPruningSink = (SparkPartitionPruningSinkOperator) newOp;
+ newPruningSink.getConf().setTableScan(oldPruningSink.getConf().getTableScan());
+ context.pruningSinkSet.add(newPruningSink);
+ context.pruningSinkSet.remove(oldPruningSink);
}
}
}
@@ -337,6 +356,67 @@ public class GenSparkUtils {
}
}
+ /**
+ * Populate partition pruning information from the pruning sink operator to the
+ * target MapWork (the MapWork for the big table side). The information include the source table
+ * name, column name, and partition key expression. It also set up the temporary path used to
+ * communicate between the target MapWork and source BaseWork.
+ *
+ * Here "source" refers to the small table side, while "target" refers to the big
+ * table side.
+ *
+ * @param context the spark context.
+ * @param pruningSink the pruner sink operator being processed.
+ */
+ public void processPartitionPruningSink(GenSparkProcContext context,
+ SparkPartitionPruningSinkOperator pruningSink) {
+ SparkPartitionPruningSinkDesc desc = pruningSink.getConf();
+ TableScanOperator ts = desc.getTableScan();
+ MapWork targetWork = (MapWork) context.rootToWorkMap.get(ts);
+
+ Preconditions.checkArgument(
+ targetWork != null,
+ "No targetWork found for tablescan " + ts);
+
+ String targetId = SparkUtilities.getWorkId(targetWork);
+
+ BaseWork sourceWork = getEnclosingWork(pruningSink, context);
+ String sourceId = SparkUtilities.getWorkId(sourceWork);
+
+ // set up temporary path to communicate between the small/big table
+ Path tmpPath = targetWork.getTmpPathForPartitionPruning();
+ if (tmpPath == null) {
+ Path baseTmpPath = context.parseContext.getContext().getMRTmpPath();
+ tmpPath = SparkUtilities.generateTmpPathForPartitionPruning(baseTmpPath, targetId);
+ targetWork.setTmpPathForPartitionPruning(tmpPath);
+ LOG.info("Setting tmp path between source work and target work:\n" + tmpPath);
+ }
+
+ desc.setPath(new Path(tmpPath, sourceId));
+ desc.setTargetWork(targetWork.getName());
+
+ // store table descriptor in map-targetWork
+ if (!targetWork.getEventSourceTableDescMap().containsKey(sourceId)) {
+ targetWork.getEventSourceTableDescMap().put(sourceId, new LinkedList<TableDesc>());
+ }
+ List<TableDesc> tables = targetWork.getEventSourceTableDescMap().get(sourceId);
+ tables.add(pruningSink.getConf().getTable());
+
+ // store column name in map-targetWork
+ if (!targetWork.getEventSourceColumnNameMap().containsKey(sourceId)) {
+ targetWork.getEventSourceColumnNameMap().put(sourceId, new LinkedList<String>());
+ }
+ List<String> columns = targetWork.getEventSourceColumnNameMap().get(sourceId);
+ columns.add(desc.getTargetColumnName());
+
+ // store partition key expr in map-targetWork
+ if (!targetWork.getEventSourcePartKeyExprMap().containsKey(sourceId)) {
+ targetWork.getEventSourcePartKeyExprMap().put(sourceId, new LinkedList<ExprNodeDesc>());
+ }
+ List<ExprNodeDesc> keys = targetWork.getEventSourcePartKeyExprMap().get(sourceId);
+ keys.add(desc.getPartKey());
+ }
+
public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink,
ReduceWork reduceWork) throws SemanticException {
SparkEdgeProperty edgeProperty = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE);
@@ -490,4 +570,33 @@ public class GenSparkUtils {
}
return false;
}
+
+ /**
+ * getEncosingWork finds the BaseWork any given operator belongs to.
+ */
+ public BaseWork getEnclosingWork(Operator<?> op, GenSparkProcContext procCtx) {
+ List<Operator<?>> ops = new ArrayList<Operator<?>>();
+ findRoots(op, ops);
+ for (Operator<?> r : ops) {
+ BaseWork work = procCtx.rootToWorkMap.get(r);
+ if (work != null) {
+ return work;
+ }
+ }
+ return null;
+ }
+
+ /*
+ * findRoots returns all root operators (in ops) that result in operator op
+ */
+ private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+ List<Operator<?>> parents = op.getParentOperators();
+ if (parents == null || parents.isEmpty()) {
+ ops.add(op);
+ return;
+ }
+ for (Operator<?> p : parents) {
+ findRoots(p, ops);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
index f7586a4..3b71af1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
@@ -20,15 +20,12 @@ package org.apache.hadoop.hive.ql.parse.spark;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -49,19 +46,12 @@ public class OptimizeSparkProcContext implements NodeProcessorCtx {
private final Set<ReduceSinkOperator> visitedReduceSinks = new HashSet<ReduceSinkOperator>();
private final Map<MapJoinOperator, Long> mjOpSizes = new HashMap<MapJoinOperator, Long>();
- // rootOperators are all the table scan operators in sequence
- // of traversal
- private final Deque<Operator<? extends OperatorDesc>> rootOperators;
-
public OptimizeSparkProcContext(HiveConf conf, ParseContext parseContext,
- Set<ReadEntity> inputs, Set<WriteEntity> outputs,
- Deque<Operator<? extends OperatorDesc>> rootOperators) {
-
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
this.conf = conf;
this.parseContext = parseContext;
this.inputs = inputs;
this.outputs = outputs;
- this.rootOperators = rootOperators;
}
public ParseContext getParseContext() {
@@ -84,10 +74,6 @@ public class OptimizeSparkProcContext implements NodeProcessorCtx {
return visitedReduceSinks;
}
- public Deque<Operator<? extends OperatorDesc>> getRootOperators() {
- return rootOperators;
- }
-
public Map<MapJoinOperator, Long> getMjOpSizes() {
return mjOpSizes;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 7f2c079..27a1d99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -19,10 +19,8 @@ package org.apache.hadoop.hive.ql.parse.spark;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,6 +33,7 @@ import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -44,12 +43,14 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -58,7 +59,10 @@ import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.lib.TypeRule;
import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
+import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization;
+import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruningBySize;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
@@ -72,8 +76,8 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinHintOptimizer;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -83,7 +87,6 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
/**
* SparkCompiler translates the operator plan into SparkTasks.
@@ -102,22 +105,70 @@ public class SparkCompiler extends TaskCompiler {
protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
Set<WriteEntity> outputs) throws SemanticException {
PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
- // Sequence of TableScan operators to be walked
- Deque<Operator<? extends OperatorDesc>> deque = new LinkedList<Operator<? extends OperatorDesc>>();
- deque.addAll(pCtx.getTopOps().values());
- OptimizeSparkProcContext procCtx = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs, deque);
- // create a walker which walks the tree in a DFS manner while maintaining
- // the operator stack.
+ OptimizeSparkProcContext procCtx = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs);
+
+ // Run Spark Dynamic Partition Pruning
+ runDynamicPartitionPruning(procCtx);
+
+ // Annotation OP tree with statistics
+ runStatsAnnotation(procCtx);
+
+ // Run Join releated optimizations
+ runJoinOptimizations(procCtx);
+
+ PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
+ }
+
+ private void runStatsAnnotation(OptimizeSparkProcContext procCtx) throws SemanticException {
+ new AnnotateWithStatistics().transform(procCtx.getParseContext());
+ new AnnotateWithOpTraits().transform(procCtx.getParseContext());
+ }
+
+ private void runDynamicPartitionPruning(OptimizeSparkProcContext procCtx)
+ throws SemanticException {
+ if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+ return;
+ }
+
+ ParseContext parseContext = procCtx.getParseContext();
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(
+ new RuleRegExp(new String("Dynamic Partition Pruning"),
+ FilterOperator.getOperatorName() + "%"),
+ new DynamicPartitionPruningOptimization());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ GraphWalker ogw = new ForwardWalker(disp);
+
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(parseContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ // need a new run of the constant folding because we might have created lots
+ // of "and true and true" conditions.
+ if(procCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
+ new ConstantPropagate().transform(parseContext);
+ }
+ }
+
+ private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException {
+ ParseContext pCtx = procCtx.getParseContext();
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
- ReduceSinkOperator.getOperatorName() + "%"),
- new SetSparkReducerParallelism());
+ ReduceSinkOperator.getOperatorName() + "%"),
+ new SetSparkReducerParallelism());
opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx));
opRules.put(new TypeRule(MapJoinOperator.class), new SparkJoinHintOptimizer(pCtx));
+ opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning By Size",
+ SparkPartitionPruningSinkOperator.getOperatorName() + "%"),
+ new SparkRemoveDynamicPruningBySize());
+
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
@@ -127,7 +178,6 @@ public class SparkCompiler extends TaskCompiler {
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pCtx.getTopOps().values());
ogw.startWalking(topNodes, null);
- PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
}
/**
@@ -138,20 +188,90 @@ public class SparkCompiler extends TaskCompiler {
List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
throws SemanticException {
PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
- GenSparkUtils.getUtils().resetSequenceNumber();
- ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
- GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils());
+ GenSparkUtils utils = GenSparkUtils.getUtils();
+ utils.resetSequenceNumber();
+ ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
GenSparkProcContext procCtx = new GenSparkProcContext(
conf, tempParseContext, mvTask, rootTasks, inputs, outputs, pCtx.getTopOps());
+ // -------------------------------- First Pass ---------------------------------- //
+ // Identify SparkPartitionPruningSinkOperators, and break OP tree if necessary
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("Clone OP tree for PartitionPruningSink",
+ SparkPartitionPruningSinkOperator.getOperatorName() + "%"),
+ new SplitOpTreeForDPP());
+
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
+
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ // -------------------------------- Second Pass ---------------------------------- //
+ // Process operator tree in two steps: first we process the extra op trees generated
+ // in the first pass. Then we process the main op tree, and the result task will depend
+ // on the task generated in the first pass.
+ topNodes.clear();
+ topNodes.addAll(procCtx.topOps.values());
+ generateTaskTreeHelper(procCtx, topNodes);
+
+ // If this set is not empty, it means we need to generate a separate task for collecting
+ // the partitions used.
+ if (!procCtx.clonedPruningTableScanSet.isEmpty()) {
+ SparkTask pruningTask = SparkUtilities.createSparkTask(conf);
+ SparkTask mainTask = procCtx.currentTask;
+ pruningTask.addDependentTask(procCtx.currentTask);
+ procCtx.rootTasks.remove(procCtx.currentTask);
+ procCtx.rootTasks.add(pruningTask);
+ procCtx.currentTask = pruningTask;
+
+ topNodes.clear();
+ topNodes.addAll(procCtx.clonedPruningTableScanSet);
+ generateTaskTreeHelper(procCtx, topNodes);
+
+ procCtx.currentTask = mainTask;
+ }
+
+ // -------------------------------- Post Pass ---------------------------------- //
+
+ // we need to clone some operator plans and remove union operators still
+ for (BaseWork w : procCtx.workWithUnionOperators) {
+ GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);
+ }
+
+ // we need to fill MapWork with 'local' work and bucket information for SMB Join.
+ GenSparkUtils.getUtils().annotateMapWork(procCtx);
+
+ // finally make sure the file sink operators are set up right
+ for (FileSinkOperator fileSink : procCtx.fileSinkSet) {
+ GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
+ }
+
+ // Process partition pruning sinks
+ for (Operator<?> prunerSink : procCtx.pruningSinkSet) {
+ utils.processPartitionPruningSink(procCtx, (SparkPartitionPruningSinkOperator) prunerSink);
+ }
+
+ PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
+ }
+
+ private void generateTaskTreeHelper(GenSparkProcContext procCtx, List<Node> topNodes)
+ throws SemanticException {
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack. The dispatcher generates the plan from the operator tree
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils());
+
opRules.put(new RuleRegExp("Split Work - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"), genSparkWork);
+ opRules.put(new RuleRegExp("Split Work - SparkPartitionPruningSink",
+ SparkPartitionPruningSinkOperator.getOperatorName() + "%"), genSparkWork);
+
opRules.put(new TypeRule(MapJoinOperator.class), new SparkReduceSinkMapJoinProc());
opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink",
@@ -186,8 +306,10 @@ public class SparkCompiler extends TaskCompiler {
* SMBJoinOP
*
* Some of the other processors are expecting only one traversal beyond SMBJoinOp.
- * We need to traverse from the big-table path only, and stop traversing on the small-table path once we reach SMBJoinOp.
- * Also add some SMB join information to the context, so we can properly annotate the MapWork later on.
+ * We need to traverse from the big-table path only, and stop traversing on the
+ * small-table path once we reach SMBJoinOp.
+ * Also add some SMB join information to the context, so we can properly annotate
+ * the MapWork later on.
*/
opRules.put(new TypeRule(SMBMapJoinOperator.class),
new NodeProcessor() {
@@ -219,25 +341,8 @@ public class SparkCompiler extends TaskCompiler {
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
- List<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pCtx.getTopOps().values());
GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
ogw.startWalking(topNodes, null);
-
- // we need to clone some operator plans and remove union operators still
- for (BaseWork w: procCtx.workWithUnionOperators) {
- GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);
- }
-
- // we need to fill MapWork with 'local' work and bucket information for SMB Join.
- GenSparkUtils.getUtils().annotateMapWork(procCtx);
-
- // finally make sure the file sink operators are set up right
- for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
- GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
- }
-
- PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
new file mode 100644
index 0000000..20432c7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+
+/**
+ * This operator gets partition info from the upstream operators, and write them
+ * to HDFS. This will later be read at the driver, and used for pruning the partitions
+ * for the big table side.
+ */
+public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPruningSinkDesc> {
+
+ @SuppressWarnings("deprecation")
+ protected transient Serializer serializer;
+ protected transient DataOutputBuffer buffer;
+ protected static final Log LOG = LogFactory.getLog(SparkPartitionPruningSinkOperator.class);
+
+ @SuppressWarnings("deprecation")
+ public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+ Collection<Future<?>> result = super.initializeOp(hconf);
+ serializer = (Serializer) ReflectionUtils.newInstance(
+ conf.getTable().getDeserializerClass(), null);
+ buffer = new DataOutputBuffer();
+ return result;
+ }
+
+ @Override
+ public void process(Object row, int tag) throws HiveException {
+ ObjectInspector rowInspector = inputObjInspectors[0];
+ try {
+ Writable writableRow = serializer.serialize(row, rowInspector);
+ writableRow.write(buffer);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort) {
+ try {
+ flushToFile();
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+ }
+
+ private void flushToFile() throws IOException {
+ // write an intermediate file to the specified path
+ // the format of the path is: tmpPath/targetWorkId/sourceWorkId/randInt
+ Path path = conf.getPath();
+ FileSystem fs = path.getFileSystem(this.getConfiguration());
+ fs.mkdirs(path);
+
+ while (true) {
+ path = new Path(path, String.valueOf(Utilities.randGen.nextInt()));
+ if (!fs.exists(path)) {
+ break;
+ }
+ }
+
+ short numOfRepl = fs.getDefaultReplication(path);
+
+ ObjectOutputStream out = null;
+ FSDataOutputStream fsout = null;
+
+ try {
+ fsout = fs.create(path, numOfRepl);
+ out = new ObjectOutputStream(new BufferedOutputStream(fsout, 4096));
+ out.writeUTF(conf.getTargetColumnName());
+ buffer.writeTo(out);
+ } catch (Exception e) {
+ try {
+ fs.delete(path, false);
+ } catch (Exception ex) {
+ LOG.warn("Exception happened while trying to clean partial file.");
+ }
+ throw e;
+ } finally {
+ if (out != null) {
+ LOG.info("Flushed to file: " + path);
+ out.close();
+ } else if (fsout != null) {
+ fsout.close();
+ }
+ }
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.SPARKPRUNINGSINK;
+ }
+
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ public static String getOperatorName() {
+ return "SPARKPRUNINGSINK";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
new file mode 100644
index 0000000..c140f67
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * This processor triggers on SparkPartitionPruningSinkOperator. For a operator tree like
+ * this:
+ *
+ * Original Tree:
+ * TS TS
+ * | |
+ * FIL FIL
+ * | | \
+ * RS RS SEL
+ * \ / |
+ * JOIN GBY
+ * |
+ * SPARKPRUNINGSINK
+ *
+ * It removes the branch containing SPARKPRUNINGSINK from the original operator tree, and splits it into
+ * two separate trees:
+ *
+ * Tree #1: Tree #2:
+ * TS TS TS
+ * | | |
+ * FIL FIL FIL
+ * | | |
+ * RS RS SEL
+ * \ / |
+ * JOIN GBY
+ * |
+ * SPARKPRUNINGSINK
+ *
+ * For MapJoinOperator, this optimizer will not do anything - it should be executed within
+ * the same SparkTask.
+ */
+public class SplitOpTreeForDPP implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ SparkPartitionPruningSinkOperator pruningSinkOp = (SparkPartitionPruningSinkOperator) nd;
+ GenSparkProcContext context = (GenSparkProcContext) procCtx;
+
+ // Locate the op where the branch starts
+ // This is guaranteed to succeed since the branch always follow the pattern
+ // as shown in the first picture above.
+ Operator<?> filterOp = pruningSinkOp;
+ Operator<?> selOp = null;
+ while (filterOp != null) {
+ if (filterOp.getNumChild() > 1) {
+ break;
+ } else {
+ selOp = filterOp;
+ filterOp = filterOp.getParentOperators().get(0);
+ }
+ }
+
+ // Check if this is a MapJoin. If so, do not split.
+ for (Operator<?> childOp : filterOp.getChildOperators()) {
+ if (childOp instanceof ReduceSinkOperator &&
+ childOp.getChildOperators().get(0) instanceof MapJoinOperator) {
+ context.pruningSinkSet.add(pruningSinkOp);
+ return null;
+ }
+ }
+
+ List<Operator<?>> roots = new LinkedList<Operator<?>>();
+ collectRoots(roots, pruningSinkOp);
+
+ List<Operator<?>> savedChildOps = filterOp.getChildOperators();
+ filterOp.setChildOperators(Utilities.makeList(selOp));
+
+ // Now clone the tree above selOp
+ List<Operator<?>> newRoots = Utilities.cloneOperatorTree(context.parseContext.getConf(), roots);
+ for (int i = 0; i < roots.size(); i++) {
+ TableScanOperator newTs = (TableScanOperator) newRoots.get(i);
+ TableScanOperator oldTs = (TableScanOperator) roots.get(i);
+ newTs.getConf().setTableMetadata(oldTs.getConf().getTableMetadata());
+ }
+ context.clonedPruningTableScanSet.addAll(newRoots);
+
+ // Restore broken links between operators, and remove the branch from the original tree
+ filterOp.setChildOperators(savedChildOps);
+ filterOp.removeChild(selOp);
+
+ // Find the cloned PruningSink and add it to pruningSinkSet
+ Set<Operator<?>> sinkSet = new HashSet<Operator<?>>();
+ for (Operator<?> root : newRoots) {
+ SparkUtilities.collectOp(sinkSet, root, SparkPartitionPruningSinkOperator.class);
+ }
+ Preconditions.checkArgument(sinkSet.size() == 1,
+ "AssertionError: expected to only contain one SparkPartitionPruningSinkOperator," +
+ " but found " + sinkSet.size());
+ SparkPartitionPruningSinkOperator clonedPruningSinkOp =
+ (SparkPartitionPruningSinkOperator) sinkSet.iterator().next();
+ clonedPruningSinkOp.getConf().setTableScan(pruningSinkOp.getConf().getTableScan());
+ context.pruningSinkSet.add(clonedPruningSinkOp);
+
+ return null;
+ }
+
+ /**
+ * Recursively collect all roots (e.g., table scans) that can be reached via this op.
+ * @param result contains all roots can be reached via op
+ * @param op the op to examine.
+ */
+ private void collectRoots(List<Operator<?>> result, Operator<?> op) {
+ if (op.getNumParent() == 0) {
+ result.add(op);
+ } else {
+ for (Operator<?> parentOp : op.getParentOperators()) {
+ collectRoots(result, parentOp);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 3217df2..2cb9257 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -87,6 +87,8 @@ public class MapWork extends BaseWork {
private Path tmpHDFSPath;
+ private Path tmpPathForPartitionPruning;
+
private String inputformat;
private String indexIntermediateFile;
@@ -455,6 +457,14 @@ public class MapWork extends BaseWork {
this.tmpHDFSPath = tmpHDFSPath;
}
+ public Path getTmpPathForPartitionPruning() {
+ return this.tmpPathForPartitionPruning;
+ }
+
+ public void setTmpPathForPartitionPruning(Path tmpPathForPartitionPruning) {
+ this.tmpPathForPartitionPruning = tmpPathForPartitionPruning;
+ }
+
public void mergingInto(MapWork mapWork) {
// currently, this is sole field affecting mergee task
mapWork.useBucketizedHiveInputFormat |= useBucketizedHiveInputFormat;
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 6282380..dbb5209 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -99,7 +99,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
private transient TableSample tableSample;
- private transient final Table tableMetadata;
+ private transient Table tableMetadata;
public TableScanDesc() {
@@ -274,6 +274,10 @@ public class TableScanDesc extends AbstractOperatorDesc {
return tableMetadata;
}
+ public void setTableMetadata(Table tableMetadata) {
+ this.tableMetadata = tableMetadata;
+ }
+
public TableSample getTableSample() {
return tableSample;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7413e4b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
index 363e49e..32af813 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
@@ -67,8 +67,18 @@ public class SyntheticJoinPredicate implements Transform {
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
- if (!pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
- || !pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+ boolean enabled = false;
+ String queryEngine = pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE);
+
+ if (queryEngine.equals("tez")
+ && pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+ enabled = true;
+ } else if ((queryEngine.equals("spark")
+ && pctx.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING))) {
+ enabled = true;
+ }
+
+ if (!enabled) {
return pctx;
}