You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2015/07/17 21:53:16 UTC

[24/48] hive git commit: HIVE-9152 - Dynamic Partition Pruning [Spark Branch] (Chao Sun, reviewed by Xuefu Zhang and Chengxiang Li)

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/gen/thrift/gen-rb/queryplan_constants.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_constants.rb b/ql/src/gen/thrift/gen-rb/queryplan_constants.rb
index 8080b3f..428185e 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_constants.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_constants.rb
@@ -1,5 +1,5 @@
 #
-# Autogenerated by Thrift Compiler (0.9.0)
+# Autogenerated by Thrift Compiler (0.9.2)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index c2c4220..1a22f07 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -1,5 +1,5 @@
 #
-# Autogenerated by Thrift Compiler (0.9.0)
+# Autogenerated by Thrift Compiler (0.9.2)
 #
 # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 #
@@ -46,8 +46,9 @@ module OperatorType
   ORCFILEMERGE = 22
   RCFILEMERGE = 23
   MERGEJOIN = 24
-  VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE", 24 => "MERGEJOIN"}
-  VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE, MERGEJOIN]).freeze
+  SPARKPRUNINGSINK = 25
+  VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE", 24 => "MERGEJOIN", 25 => "SPARKPRUNINGSINK"}
+  VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE, MERGEJOIN, SPARKPRUNINGSINK]).freeze
 end
 
 module TaskType

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index f58a10b..ff58741 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -34,8 +34,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
@@ -117,6 +120,8 @@ public final class OperatorFactory {
         AppMasterEventOperator.class));
     opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
         AppMasterEventOperator.class));
+    opvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(SparkPartitionPruningSinkDesc.class,
+        SparkPartitionPruningSinkOperator.class));
     opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class,
         RCFileMergeOperator.class));
     opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
@@ -133,6 +138,9 @@ public final class OperatorFactory {
         VectorAppMasterEventOperator.class));
     vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
         VectorAppMasterEventOperator.class));
+    vectorOpvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(
+        SparkPartitionPruningSinkDesc.class,
+        VectorSparkPartitionPruningSinkOperator.class));
     vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
     vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
     vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class));

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 21398d8..007db75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
@@ -148,11 +149,11 @@ public class HiveSparkClientFactory {
     Set<String> classes = Sets.newHashSet(
       Splitter.on(",").trimResults().omitEmptyStrings().split(
         Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister"))));
+    classes.add(Writable.class.getName());
     classes.add(VectorizedRowBatch.class.getName());
     classes.add(BytesWritable.class.getName());
     classes.add(HiveKey.class.getName());
-    sparkConf.put(
-      "spark.kryo.classesToRegister", Joiner.on(",").join(classes));
+    sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes));
 
     return sparkConf;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
new file mode 100644
index 0000000..52913e0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.clearspring.analytics.util.Preconditions;
+import javolution.testing.AssertionException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The spark version of DynamicPartitionPruner.
+ */
+public class SparkDynamicPartitionPruner {
+  private static final Log LOG = LogFactory.getLog(SparkDynamicPartitionPruner.class);
+  private final Map<String, List<SourceInfo>> sourceInfoMap = new LinkedHashMap<String, List<SourceInfo>>();
+  private final BytesWritable writable = new BytesWritable();
+
+  public void prune(MapWork work, JobConf jobConf) throws HiveException, SerDeException {
+    sourceInfoMap.clear();
+    initialize(work, jobConf);
+    if (sourceInfoMap.size() == 0) {
+      // Nothing to prune for this MapWork
+      return;
+    }
+    processFiles(work, jobConf);
+    prunePartitions(work);
+  }
+
+  public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+    Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
+    Set<String> sourceWorkIds = work.getEventSourceTableDescMap().keySet();
+
+    for (String id : sourceWorkIds) {
+      List<TableDesc> tables = work.getEventSourceTableDescMap().get(id);
+      List<String> columnNames = work.getEventSourceColumnNameMap().get(id);
+      List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(id);
+
+      Iterator<String> cit = columnNames.iterator();
+      Iterator<ExprNodeDesc> pit = partKeyExprs.iterator();
+      for (TableDesc t : tables) {
+        String columnName = cit.next();
+        ExprNodeDesc partKeyExpr = pit.next();
+        SourceInfo si = new SourceInfo(t, partKeyExpr, columnName, jobConf);
+        if (!sourceInfoMap.containsKey(id)) {
+          sourceInfoMap.put(id, new ArrayList<SourceInfo>());
+        }
+        sourceInfoMap.get(id).add(si);
+
+        // We could have multiple sources restrict the same column, need to take
+        // the union of the values in that case.
+        if (columnMap.containsKey(columnName)) {
+          si.values = columnMap.get(columnName).values;
+        }
+        columnMap.put(columnName, si);
+      }
+    }
+  }
+
+  private void processFiles(MapWork work, JobConf jobConf) throws HiveException {
+    ObjectInputStream in = null;
+    try {
+      Path baseDir = work.getTmpPathForPartitionPruning();
+      FileSystem fs = FileSystem.get(baseDir.toUri(), jobConf);
+
+      // Find the SourceInfo to put values in.
+      for (String name : sourceInfoMap.keySet()) {
+        Path sourceDir = new Path(baseDir, name);
+        for (FileStatus fstatus : fs.listStatus(sourceDir)) {
+          LOG.info("Start processing pruning file: " + fstatus.getPath());
+          in = new ObjectInputStream(fs.open(fstatus.getPath()));
+          String columnName = in.readUTF();
+          SourceInfo info = null;
+
+          for (SourceInfo si : sourceInfoMap.get(name)) {
+            if (columnName.equals(si.columnName)) {
+              info = si;
+              break;
+            }
+          }
+
+          Preconditions.checkArgument(info != null,
+              "AssertionError: no source info for the column: " + columnName);
+
+          // Read fields
+          while (in.available() > 0) {
+            writable.readFields(in);
+
+            Object row = info.deserializer.deserialize(writable);
+            Object value = info.soi.getStructFieldData(row, info.field);
+            value = ObjectInspectorUtils.copyToStandardObject(value, info.fieldInspector);
+            info.values.add(value);
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    } finally {
+      try {
+        if (in != null) {
+          in.close();
+        }
+      } catch (IOException e) {
+        throw new HiveException("error while trying to close input stream", e);
+      }
+    }
+  }
+
+  private void prunePartitions(MapWork work) throws HiveException {
+    for (String source : sourceInfoMap.keySet()) {
+      for (SourceInfo info : sourceInfoMap.get(source)) {
+        prunePartitionSingleSource(info, work);
+      }
+    }
+  }
+
+  private void prunePartitionSingleSource(SourceInfo info, MapWork work)
+      throws HiveException {
+    Set<Object> values = info.values;
+    String columnName = info.columnName;
+
+    ObjectInspector oi =
+        PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory
+            .getPrimitiveTypeInfo(info.fieldInspector.getTypeName()));
+
+    ObjectInspectorConverters.Converter converter =
+        ObjectInspectorConverters.getConverter(
+            PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi);
+
+    StructObjectInspector soi =
+        ObjectInspectorFactory.getStandardStructObjectInspector(
+            Collections.singletonList(columnName), Collections.singletonList(oi));
+
+    @SuppressWarnings("rawtypes")
+    ExprNodeEvaluator eval = ExprNodeEvaluatorFactory.get(info.partKey);
+    eval.initialize(soi);
+
+    applyFilterToPartitions(work, converter, eval, columnName, values);
+  }
+
+  private void applyFilterToPartitions(
+      MapWork work,
+      ObjectInspectorConverters.Converter converter,
+      ExprNodeEvaluator eval,
+      String columnName,
+      Set<Object> values) throws HiveException {
+
+    Object[] row = new Object[1];
+
+    Iterator<String> it = work.getPathToPartitionInfo().keySet().iterator();
+    while (it.hasNext()) {
+      String p = it.next();
+      PartitionDesc desc = work.getPathToPartitionInfo().get(p);
+      Map<String, String> spec = desc.getPartSpec();
+      if (spec == null) {
+        throw new AssertionException("No partition spec found in dynamic pruning");
+      }
+
+      String partValueString = spec.get(columnName);
+      if (partValueString == null) {
+        throw new AssertionException("Could not find partition value for column: " + columnName);
+      }
+
+      Object partValue = converter.convert(partValueString);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Converted partition value: " + partValue + " original (" + partValueString + ")");
+      }
+
+      row[0] = partValue;
+      partValue = eval.evaluate(row);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("part key expr applied: " + partValue);
+      }
+
+      if (!values.contains(partValue)) {
+        LOG.info("Pruning path: " + p);
+        it.remove();
+        work.getPathToAliases().remove(p);
+        work.getPaths().remove(p);
+        work.getPartitionDescs().remove(desc);
+      }
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  private static class SourceInfo {
+    final ExprNodeDesc partKey;
+    final Deserializer deserializer;
+    final StructObjectInspector soi;
+    final StructField field;
+    final ObjectInspector fieldInspector;
+    Set<Object> values = new HashSet<Object>();
+    final String columnName;
+
+    SourceInfo(TableDesc table, ExprNodeDesc partKey, String columnName, JobConf jobConf)
+        throws SerDeException {
+      this.partKey = partKey;
+      this.columnName = columnName;
+
+      deserializer = ReflectionUtils.newInstance(table.getDeserializerClass(), null);
+      deserializer.initialize(jobConf, table.getProperties());
+
+      ObjectInspector inspector = deserializer.getObjectInspector();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Type of obj insp: " + inspector.getTypeName());
+      }
+
+      soi = (StructObjectInspector) inspector;
+      List<? extends StructField> fields = soi.getAllStructFieldRefs();
+      assert(fields.size() > 1) : "expecting single field in input";
+
+      field = fields.get(0);
+      fieldInspector =
+          ObjectInspectorUtils.getStandardObjectInspector(field.getFieldObjectInspector());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index ca0ffb6..cf2c3bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -23,16 +23,22 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.Collection;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.Dependency;
@@ -158,4 +164,54 @@ public class SparkUtilities {
       }
     }
   }
+
+  /**
+   * Generate a temporary path for dynamic partition pruning in Spark branch
+   * TODO: no longer need this if we use accumulator!
+   * @param basePath
+   * @param id
+   * @return
+   */
+  public static Path generateTmpPathForPartitionPruning(Path basePath, String id) {
+    return new Path(basePath, id);
+  }
+
+  /**
+   * Return the ID for this BaseWork, in String form.
+   * @param work the input BaseWork
+   * @return the unique ID for this BaseWork
+   */
+  public static String getWorkId(BaseWork work) {
+    String workName = work.getName();
+    return workName.substring(workName.indexOf(" ") + 1);
+  }
+
+  public static SparkTask createSparkTask(HiveConf conf) {
+    return (SparkTask) TaskFactory.get(
+        new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+  }
+
+  public static SparkTask createSparkTask(SparkWork work, HiveConf conf) {
+    return (SparkTask) TaskFactory.get(work, conf);
+  }
+
+  /**
+   * Recursively find all operators under root, that are of class clazz, and
+   * put them in result.
+   * @param result all operators under root that are of class clazz
+   * @param root the root operator under which all operators will be examined
+   * @param clazz clas to collect. Must NOT be null.
+   */
+  public static void collectOp(Collection<Operator<?>> result, Operator<?> root, Class<?> clazz) {
+    Preconditions.checkArgument(clazz != null, "AssertionError: clazz should not be null");
+    if (root == null) {
+      return;
+    }
+    if (clazz.equals(root.getClass())) {
+      result.add(root);
+    }
+    for (Operator<?> child : root.getChildOperators()) {
+      collectOp(result, child, clazz);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
new file mode 100644
index 0000000..3bce49d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vectorized version for SparkPartitionPruningSinkOperator.
+ * Forked from VectorAppMasterEventOperator.
+ **/
+public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruningSinkOperator {
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorizationContext vContext;
+
+  protected transient boolean firstBatch;
+
+  protected transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+
+  protected transient Object[] singleRow;
+
+  public VectorSparkPartitionPruningSinkOperator(VectorizationContext context,
+      OperatorDesc conf) {
+    super();
+    this.conf = (SparkPartitionPruningSinkDesc) conf;
+    this.vContext = context;
+  }
+
+  public VectorSparkPartitionPruningSinkOperator() {
+  }
+
+  @Override
+  public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    inputObjInspectors[0] =
+        VectorizedBatchUtil.convertToStandardStructObjectInspector(
+            (StructObjectInspector) inputObjInspectors[0]);
+    Collection<Future<?>> result = super.initializeOp(hconf);
+    assert result.isEmpty();
+
+    firstBatch = true;
+
+    return result;
+  }
+
+  @Override
+  public void process(Object data, int tag) throws HiveException {
+    VectorizedRowBatch batch = (VectorizedRowBatch) data;
+    if (firstBatch) {
+      vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
+      vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0],
+          vContext.getProjectedColumns());
+      singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+      firstBatch = false;
+    }
+
+    vectorExtractRowDynBatch.setBatchOnEntry(batch);
+    ObjectInspector rowInspector = inputObjInspectors[0];
+    try {
+      Writable writableRow;
+      for (int logical = 0; logical < batch.size; logical++) {
+        int batchIndex = batch.selectedInUse ? batch.selected[logical] : logical;
+        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        writableRow = serializer.serialize(singleRow, rowInspector);
+        writableRow.write(buffer);
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+
+    vectorExtractRowDynBatch.forgetBatchOnExit();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index 1de7e40..e13c4dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -514,34 +515,40 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
     int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
         (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD));
     int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads);
-    LOG.info("Total number of paths: " + paths.length +
-        ", launching " + numThreads + " threads to check non-combinable ones.");
-    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
-    List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
-    try {
-      for (int i = 0; i < numThreads; i++) {
-        int start = i * numPathPerThread;
-        int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
-        futureList.add(executor.submit(
-            new CheckNonCombinablePathCallable(paths, start, length, job)));
-      }
-      Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
-      for (Future<Set<Integer>> future : futureList) {
-        nonCombinablePathIndices.addAll(future.get());
-      }
-      for (int i = 0; i < paths.length; i++) {
-        if (nonCombinablePathIndices.contains(i)) {
-          nonCombinablePaths.add(paths[i]);
-        } else {
-          combinablePaths.add(paths[i]);
+
+    // This check is necessary because for Spark branch, the result array from
+    // getInputPaths() above could be empty, and therefore numThreads could be 0.
+    // In that case, Executors.newFixedThreadPool will fail.
+    if (numThreads > 0) {
+      LOG.info("Total number of paths: " + paths.length +
+          ", launching " + numThreads + " threads to check non-combinable ones.");
+      ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+      List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
+      try {
+        for (int i = 0; i < numThreads; i++) {
+          int start = i * numPathPerThread;
+          int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
+          futureList.add(executor.submit(
+              new CheckNonCombinablePathCallable(paths, start, length, job)));
+        }
+        Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+        for (Future<Set<Integer>> future : futureList) {
+          nonCombinablePathIndices.addAll(future.get());
+        }
+        for (int i = 0; i < paths.length; i++) {
+          if (nonCombinablePathIndices.contains(i)) {
+            nonCombinablePaths.add(paths[i]);
+          } else {
+            combinablePaths.add(paths[i]);
+          }
         }
+      } catch (Exception e) {
+        LOG.error("Error checking non-combinable path", e);
+        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+        throw new IOException(e);
+      } finally {
+        executor.shutdownNow();
       }
-    } catch (Exception e) {
-      LOG.error("Error checking non-combinable path", e);
-      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
-      throw new IOException(e);
-    } finally {
-      executor.shutdownNow();
     }
 
     // Store the previous value for the path specification

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 2ff3951..fd16b35 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Map.Entry;
 
@@ -37,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -266,6 +268,18 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       } else {
         mrwork = Utilities.getMapWork(job);
       }
+
+      // Prune partitions
+      if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")
+          && HiveConf.getBoolVar(job, HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+        SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner();
+        try {
+          pruner.prune(mrwork, job);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
       pathToPartitionInfo = mrwork.getPathToPartitionInfo();
     }
   }
@@ -309,18 +323,28 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   Path[] getInputPaths(JobConf job) throws IOException {
-    Path[] dirs = FileInputFormat.getInputPaths(job);
-    if (dirs.length == 0) {
-      // on tez we're avoiding to duplicate the file info in FileInputFormat.
-      if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
-        try {
-          List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
-          dirs = paths.toArray(new Path[paths.size()]);
-        } catch (Exception e) {
-          throw new IOException("Could not create input files", e);
+    Path[] dirs;
+    if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      Set<String> pathStrings = mrwork.getPathToPartitionInfo().keySet();
+      dirs = new Path[pathStrings.size()];
+      Iterator<String> it = pathStrings.iterator();
+      for (int i = 0; i < dirs.length; i++) {
+        dirs[i] = new Path(it.next());
+      }
+    } else {
+      dirs = FileInputFormat.getInputPaths(job);
+      if (dirs.length == 0) {
+        // on tez we're avoiding to duplicate the file info in FileInputFormat.
+        if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+          try {
+            List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
+            dirs = paths.toArray(new Path[paths.size()]);
+          } catch (Exception e) {
+            throw new IOException("Could not create input files", e);
+          }
+        } else {
+          throw new IOException("No input paths specified in job");
         }
-      } else {
-        throw new IOException("No input paths specified in job");
       }
     }
     return dirs;

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index 8546d21..f475926 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -48,10 +48,12 @@ import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -152,15 +154,24 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
   @Override
   public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
       throws SemanticException {
-    OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
-    ParseContext parseContext = context.parseContext;
+    ParseContext parseContext;
+    if (procCtx instanceof OptimizeTezProcContext) {
+      parseContext = ((OptimizeTezProcContext) procCtx).parseContext;
+    } else if (procCtx instanceof OptimizeSparkProcContext) {
+      parseContext = ((OptimizeSparkProcContext) procCtx).getParseContext();
+    } else {
+      throw new IllegalArgumentException("expected parseContext to be either " +
+          "OptimizeTezProcContext or OptimizeSparkProcContext, but found " +
+          procCtx.getClass().getName());
+    }
 
     FilterOperator filter = (FilterOperator) nd;
     FilterDesc desc = filter.getConf();
 
     TableScanOperator ts = null;
 
-    if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+    if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING) &&
+        !parseContext.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
       // nothing to do when the optimization is off
       return null;
     }
@@ -311,14 +322,25 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
     groupByOp.setColumnExprMap(colMap);
 
     // finally add the event broadcast operator
-    DynamicPruningEventDesc eventDesc = new DynamicPruningEventDesc();
-    eventDesc.setTableScan(ts);
-    eventDesc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils
-        .getFieldSchemasFromColumnList(keyExprs, "key")));
-    eventDesc.setTargetColumnName(column);
-    eventDesc.setPartKey(partKey);
-
-    OperatorFactory.getAndMakeChild(eventDesc, groupByOp);
+    if (HiveConf.getVar(parseContext.getConf(),
+        ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+      DynamicPruningEventDesc eventDesc = new DynamicPruningEventDesc();
+      eventDesc.setTableScan(ts);
+      eventDesc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils
+          .getFieldSchemasFromColumnList(keyExprs, "key")));
+      eventDesc.setTargetColumnName(column);
+      eventDesc.setPartKey(partKey);
+      OperatorFactory.getAndMakeChild(eventDesc, groupByOp);
+    } else {
+      // Must be spark branch
+      SparkPartitionPruningSinkDesc desc = new SparkPartitionPruningSinkDesc();
+      desc.setTableScan(ts);
+      desc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils
+          .getFieldSchemasFromColumnList(keyExprs, "key")));
+      desc.setTargetColumnName(column);
+      desc.setPartKey(partKey);
+      OperatorFactory.getAndMakeChild(desc, groupByOp);
+    }
   }
 
   private Map<Node, Object> walkExprTree(ExprNodeDesc pred, NodeProcessorCtx ctx)

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index a7cf8b7..e6db133 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -167,7 +167,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
       transformations.add(new StatsOptimizer());
     }
-    if (isSparkExecEngine || (pctx.getContext().getExplain() && !isTezExecEngine)) {
+    if (pctx.getContext().getExplain() && !isTezExecEngine && !isSparkExecEngine) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
new file mode 100644
index 0000000..3742857
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+
+/**
+ * If we expect the number of keys for dynamic pruning to be too large we
+ * disable it.
+ *
+ * Cloned from RemoveDynamicPruningBySize
+ */
+public class SparkRemoveDynamicPruningBySize implements NodeProcessor {
+
+  static final private Log LOG = LogFactory.getLog(SparkRemoveDynamicPruningBySize.class.getName());
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext,
+      Object... nodeOutputs)
+      throws SemanticException {
+
+    OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext;
+
+    SparkPartitionPruningSinkOperator op = (SparkPartitionPruningSinkOperator) nd;
+    SparkPartitionPruningSinkDesc desc = op.getConf();
+
+    if (desc.getStatistics().getDataSize() > context.getConf()
+        .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
+      Operator<?> child = op;
+      Operator<?> curr = op;
+
+      while (curr.getChildOperators().size() <= 1) {
+        child = curr;
+        curr = curr.getParentOperators().get(0);
+      }
+
+      curr.removeChild(child);
+      // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
+      LOG.info("Disabling dynamic pruning for: "
+          + desc.getTableScan().getName()
+          + ". Expected data size is too big: " + desc.getStatistics().getDataSize());
+    }
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index ad47547..180513e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -919,6 +919,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       case FILESINK:
       case LIMIT:
       case EVENT:
+      case SPARKPRUNINGSINK:
         ret = true;
         break;
       case HASHTABLESINK:
@@ -965,6 +966,7 @@ public class Vectorizer implements PhysicalPlanResolver {
         break;
       case LIMIT:
       case EVENT:
+      case SPARKPRUNINGSINK:
         ret = true;
         break;
       case HASHTABLESINK:

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
new file mode 100644
index 0000000..cc78227
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+@Explain(displayName = "Spark Partition Pruning Sink Operator")
+public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc {
+
+  // column in the target table that will be pruned against
+  private String targetColumnName;
+
+  private TableDesc table;
+
+  private transient TableScanOperator tableScan;
+
+  // the partition column we're interested in
+  private ExprNodeDesc partKey;
+
+  private Path path;
+
+  private String targetWork;
+
+  @Explain(displayName = "tmp Path", explainLevels = { Explain.Level.EXTENDED })
+  public Path getPath() {
+    return path;
+  }
+
+  public void setPath(Path path) {
+    this.path = path;
+  }
+
+  @Explain(displayName = "target work")
+  public String getTargetWork() {
+    return this.targetWork;
+  }
+
+  public void setTargetWork(String targetWork) {
+    this.targetWork = targetWork;
+  }
+
+  public TableScanOperator getTableScan() {
+    return tableScan;
+  }
+
+  public void setTableScan(TableScanOperator tableScan) {
+    this.tableScan = tableScan;
+  }
+
+  @Explain(displayName = "target column name")
+  public String getTargetColumnName() {
+    return targetColumnName;
+  }
+
+  public void setTargetColumnName(String targetColumnName) {
+    this.targetColumnName = targetColumnName;
+  }
+
+  public ExprNodeDesc getPartKey() {
+    return partKey;
+  }
+
+  public void setPartKey(ExprNodeDesc partKey) {
+    this.partKey = partKey;
+  }
+
+  public TableDesc getTable() {
+    return table;
+  }
+
+  public void setTable(TableDesc table) {
+    this.table = table;
+  }
+
+  @Explain(displayName = "partition key expr")
+  public String getPartKeyString() {
+    return partKey.getExprString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
index 447f104..0a0c791 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -40,7 +41,6 @@ import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -138,6 +138,13 @@ public class GenSparkProcContext implements NodeProcessorCtx {
   // This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias.
   public final Map<String, Operator<? extends OperatorDesc>> topOps;
 
+  // The set of pruning sinks
+  public final Set<Operator<?>> pruningSinkSet;
+
+  // The set of TableScanOperators for pruning OP trees
+  public final Set<Operator<?>> clonedPruningTableScanSet;
+
+
   @SuppressWarnings("unchecked")
   public GenSparkProcContext(HiveConf conf,
       ParseContext parseContext,
@@ -153,8 +160,7 @@ public class GenSparkProcContext implements NodeProcessorCtx {
     this.inputs = inputs;
     this.outputs = outputs;
     this.topOps = topOps;
-    this.currentTask = (SparkTask) TaskFactory.get(
-        new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+    this.currentTask = SparkUtilities.createSparkTask(conf);
     this.rootTasks.add(currentTask);
     this.leafOpToFollowingWorkInfo =
         new LinkedHashMap<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, ReduceWork>>();
@@ -177,5 +183,7 @@ public class GenSparkProcContext implements NodeProcessorCtx {
     this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
     this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
     this.fileSinkMap = new LinkedHashMap<FileSinkOperator, List<FileSinkOperator>>();
+    this.pruningSinkSet = new LinkedHashSet<Operator<?>>();
+    this.clonedPruningTableScanSet = new LinkedHashSet<Operator<?>>();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 7992c88..1c0b79d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -46,7 +46,9 @@ import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 
 /**
  * GenSparkUtils is a collection of shared helper methods to produce SparkWork
@@ -132,7 +135,7 @@ public class GenSparkUtils {
 
     // remember which parent belongs to which tag
     reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
-         context.preceedingWork.getName());
+        context.preceedingWork.getName());
 
     // remember the output name of the reduce sink
     reduceSink.getConf().setOutputName(reduceWork.getName());
@@ -218,6 +221,16 @@ public class GenSparkUtils {
       Iterator<Operator<?>> newOpQueueIt = newOpQueue.iterator();
       for (Operator<?> op : opQueue) {
         Operator<?> newOp = newOpQueueIt.next();
+
+        // We need to update rootToWorkMap in case the op is a key, since even
+        // though we clone the op tree, we're still using the same MapWork/ReduceWork.
+        if (context.rootToWorkMap.containsKey(op)) {
+          context.rootToWorkMap.put(newOp, context.rootToWorkMap.get(op));
+        }
+        // Don't remove the old entry - in SparkPartitionPruningSink it still
+        // refers to the old TS, and we need to lookup it later in
+        // processPartitionPruningSink.
+
         if (op instanceof FileSinkOperator) {
           List<FileSinkOperator> fileSinkList = context.fileSinkMap.get(op);
           if (fileSinkList == null) {
@@ -225,6 +238,12 @@ public class GenSparkUtils {
           }
           fileSinkList.add((FileSinkOperator) newOp);
           context.fileSinkMap.put((FileSinkOperator) op, fileSinkList);
+        } else if (op instanceof SparkPartitionPruningSinkOperator) {
+          SparkPartitionPruningSinkOperator oldPruningSink = (SparkPartitionPruningSinkOperator) op;
+          SparkPartitionPruningSinkOperator newPruningSink = (SparkPartitionPruningSinkOperator) newOp;
+          newPruningSink.getConf().setTableScan(oldPruningSink.getConf().getTableScan());
+          context.pruningSinkSet.add(newPruningSink);
+          context.pruningSinkSet.remove(oldPruningSink);
         }
       }
     }
@@ -337,6 +356,67 @@ public class GenSparkUtils {
     }
   }
 
+  /**
+   * Populate partition pruning information from the pruning sink operator to the
+   * target MapWork (the MapWork for the big table side). The information include the source table
+   * name, column name, and partition key expression. It also set up the temporary path used to
+   * communicate between the target MapWork and source BaseWork.
+   *
+   * Here "source" refers to the small table side, while "target" refers to the big
+   * table side.
+   *
+   * @param context the spark context.
+   * @param pruningSink the pruner sink operator being processed.
+   */
+  public void processPartitionPruningSink(GenSparkProcContext context,
+      SparkPartitionPruningSinkOperator pruningSink) {
+    SparkPartitionPruningSinkDesc desc = pruningSink.getConf();
+    TableScanOperator ts = desc.getTableScan();
+    MapWork targetWork = (MapWork) context.rootToWorkMap.get(ts);
+
+    Preconditions.checkArgument(
+        targetWork != null,
+        "No targetWork found for tablescan " + ts);
+
+    String targetId = SparkUtilities.getWorkId(targetWork);
+
+    BaseWork sourceWork = getEnclosingWork(pruningSink, context);
+    String sourceId = SparkUtilities.getWorkId(sourceWork);
+
+    // set up temporary path to communicate between the small/big table
+    Path tmpPath = targetWork.getTmpPathForPartitionPruning();
+    if (tmpPath == null) {
+      Path baseTmpPath = context.parseContext.getContext().getMRTmpPath();
+      tmpPath = SparkUtilities.generateTmpPathForPartitionPruning(baseTmpPath, targetId);
+      targetWork.setTmpPathForPartitionPruning(tmpPath);
+      LOG.info("Setting tmp path between source work and target work:\n" + tmpPath);
+    }
+
+    desc.setPath(new Path(tmpPath, sourceId));
+    desc.setTargetWork(targetWork.getName());
+
+    // store table descriptor in map-targetWork
+    if (!targetWork.getEventSourceTableDescMap().containsKey(sourceId)) {
+      targetWork.getEventSourceTableDescMap().put(sourceId, new LinkedList<TableDesc>());
+    }
+    List<TableDesc> tables = targetWork.getEventSourceTableDescMap().get(sourceId);
+    tables.add(pruningSink.getConf().getTable());
+
+    // store column name in map-targetWork
+    if (!targetWork.getEventSourceColumnNameMap().containsKey(sourceId)) {
+      targetWork.getEventSourceColumnNameMap().put(sourceId, new LinkedList<String>());
+    }
+    List<String> columns = targetWork.getEventSourceColumnNameMap().get(sourceId);
+    columns.add(desc.getTargetColumnName());
+
+    // store partition key expr in map-targetWork
+    if (!targetWork.getEventSourcePartKeyExprMap().containsKey(sourceId)) {
+      targetWork.getEventSourcePartKeyExprMap().put(sourceId, new LinkedList<ExprNodeDesc>());
+    }
+    List<ExprNodeDesc> keys = targetWork.getEventSourcePartKeyExprMap().get(sourceId);
+    keys.add(desc.getPartKey());
+  }
+
   public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink,
       ReduceWork reduceWork) throws SemanticException {
     SparkEdgeProperty edgeProperty = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE);
@@ -490,4 +570,33 @@ public class GenSparkUtils {
     }
     return false;
   }
+
+  /**
+   * getEncosingWork finds the BaseWork any given operator belongs to.
+   */
+  public BaseWork getEnclosingWork(Operator<?> op, GenSparkProcContext procCtx) {
+    List<Operator<?>> ops = new ArrayList<Operator<?>>();
+    findRoots(op, ops);
+    for (Operator<?> r : ops) {
+      BaseWork work = procCtx.rootToWorkMap.get(r);
+      if (work != null) {
+        return work;
+      }
+    }
+    return null;
+  }
+
+  /*
+   * findRoots returns all root operators (in ops) that result in operator op
+   */
+  private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+    List<Operator<?>> parents = op.getParentOperators();
+    if (parents == null || parents.isEmpty()) {
+      ops.add(op);
+      return;
+    }
+    for (Operator<?> p : parents) {
+      findRoots(p, ops);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
index f7586a4..3b71af1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
@@ -20,15 +20,12 @@ package org.apache.hadoop.hive.ql.parse.spark;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -49,19 +46,12 @@ public class OptimizeSparkProcContext implements NodeProcessorCtx {
   private final Set<ReduceSinkOperator> visitedReduceSinks = new HashSet<ReduceSinkOperator>();
   private final Map<MapJoinOperator, Long> mjOpSizes = new HashMap<MapJoinOperator, Long>();
 
-  // rootOperators are all the table scan operators in sequence
-  // of traversal
-  private final Deque<Operator<? extends OperatorDesc>> rootOperators;
-
   public OptimizeSparkProcContext(HiveConf conf, ParseContext parseContext,
-    Set<ReadEntity> inputs, Set<WriteEntity> outputs,
-    Deque<Operator<? extends OperatorDesc>> rootOperators) {
-
+    Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
     this.conf = conf;
     this.parseContext = parseContext;
     this.inputs = inputs;
     this.outputs = outputs;
-    this.rootOperators = rootOperators;
   }
 
   public ParseContext getParseContext() {
@@ -84,10 +74,6 @@ public class OptimizeSparkProcContext implements NodeProcessorCtx {
     return visitedReduceSinks;
   }
 
-  public Deque<Operator<? extends OperatorDesc>> getRootOperators() {
-    return rootOperators;
-  }
-
   public Map<MapJoinOperator, Long> getMjOpSizes() {
     return mjOpSizes;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 7f2c079..27a1d99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -19,10 +19,8 @@ package org.apache.hadoop.hive.ql.parse.spark;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +33,7 @@ import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -44,12 +43,14 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -58,7 +59,10 @@ import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.lib.TypeRule;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
+import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization;
+import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruningBySize;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
@@ -72,8 +76,8 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinHintOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
 import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
 import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -83,7 +87,6 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 
 /**
  * SparkCompiler translates the operator plan into SparkTasks.
@@ -102,22 +105,70 @@ public class SparkCompiler extends TaskCompiler {
   protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
       Set<WriteEntity> outputs) throws SemanticException {
     PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
-    // Sequence of TableScan operators to be walked
-    Deque<Operator<? extends OperatorDesc>> deque = new LinkedList<Operator<? extends OperatorDesc>>();
-    deque.addAll(pCtx.getTopOps().values());
 
-    OptimizeSparkProcContext procCtx = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs, deque);
-    // create a walker which walks the tree in a DFS manner while maintaining
-    // the operator stack.
+    OptimizeSparkProcContext procCtx = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs);
+
+    // Run Spark Dynamic Partition Pruning
+    runDynamicPartitionPruning(procCtx);
+
+    // Annotation OP tree with statistics
+    runStatsAnnotation(procCtx);
+
+    // Run Join releated optimizations
+    runJoinOptimizations(procCtx);
+
+    PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
+  }
+
+  private void runStatsAnnotation(OptimizeSparkProcContext procCtx) throws SemanticException {
+    new AnnotateWithStatistics().transform(procCtx.getParseContext());
+    new AnnotateWithOpTraits().transform(procCtx.getParseContext());
+  }
+
+  private void runDynamicPartitionPruning(OptimizeSparkProcContext procCtx)
+      throws SemanticException {
+    if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+      return;
+    }
+
+    ParseContext parseContext = procCtx.getParseContext();
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(
+        new RuleRegExp(new String("Dynamic Partition Pruning"),
+            FilterOperator.getOperatorName() + "%"),
+        new DynamicPartitionPruningOptimization());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    GraphWalker ogw = new ForwardWalker(disp);
+
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(parseContext.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    // need a new run of the constant folding because we might have created lots
+    // of "and true and true" conditions.
+    if(procCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
+      new ConstantPropagate().transform(parseContext);
+    }
+  }
+
+  private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException {
+    ParseContext pCtx = procCtx.getParseContext();
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
-      ReduceSinkOperator.getOperatorName() + "%"),
-      new SetSparkReducerParallelism());
+            ReduceSinkOperator.getOperatorName() + "%"),
+        new SetSparkReducerParallelism());
 
     opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx));
 
     opRules.put(new TypeRule(MapJoinOperator.class), new SparkJoinHintOptimizer(pCtx));
 
+    opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning By Size",
+        SparkPartitionPruningSinkOperator.getOperatorName() + "%"),
+        new SparkRemoveDynamicPruningBySize());
+
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
@@ -127,7 +178,6 @@ public class SparkCompiler extends TaskCompiler {
     ArrayList<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pCtx.getTopOps().values());
     ogw.startWalking(topNodes, null);
-    PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
   }
 
   /**
@@ -138,20 +188,90 @@ public class SparkCompiler extends TaskCompiler {
       List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
       throws SemanticException {
     PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
-    GenSparkUtils.getUtils().resetSequenceNumber();
 
-    ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
-    GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils());
+    GenSparkUtils utils = GenSparkUtils.getUtils();
+    utils.resetSequenceNumber();
 
+    ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
     GenSparkProcContext procCtx = new GenSparkProcContext(
         conf, tempParseContext, mvTask, rootTasks, inputs, outputs, pCtx.getTopOps());
 
+    // -------------------------------- First Pass ---------------------------------- //
+    // Identify SparkPartitionPruningSinkOperators, and break OP tree if necessary
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("Clone OP tree for PartitionPruningSink",
+            SparkPartitionPruningSinkOperator.getOperatorName() + "%"),
+        new SplitOpTreeForDPP());
+
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
+
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    // -------------------------------- Second Pass ---------------------------------- //
+    // Process operator tree in two steps: first we process the extra op trees generated
+    // in the first pass. Then we process the main op tree, and the result task will depend
+    // on the task generated in the first pass.
+    topNodes.clear();
+    topNodes.addAll(procCtx.topOps.values());
+    generateTaskTreeHelper(procCtx, topNodes);
+
+    // If this set is not empty, it means we need to generate a separate task for collecting
+    // the partitions used.
+    if (!procCtx.clonedPruningTableScanSet.isEmpty()) {
+      SparkTask pruningTask = SparkUtilities.createSparkTask(conf);
+      SparkTask mainTask = procCtx.currentTask;
+      pruningTask.addDependentTask(procCtx.currentTask);
+      procCtx.rootTasks.remove(procCtx.currentTask);
+      procCtx.rootTasks.add(pruningTask);
+      procCtx.currentTask = pruningTask;
+
+      topNodes.clear();
+      topNodes.addAll(procCtx.clonedPruningTableScanSet);
+      generateTaskTreeHelper(procCtx, topNodes);
+
+      procCtx.currentTask = mainTask;
+    }
+
+    // -------------------------------- Post Pass ---------------------------------- //
+
+    // we need to clone some operator plans and remove union operators still
+    for (BaseWork w : procCtx.workWithUnionOperators) {
+      GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);
+    }
+
+    // we need to fill MapWork with 'local' work and bucket information for SMB Join.
+    GenSparkUtils.getUtils().annotateMapWork(procCtx);
+
+    // finally make sure the file sink operators are set up right
+    for (FileSinkOperator fileSink : procCtx.fileSinkSet) {
+      GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
+    }
+
+    // Process partition pruning sinks
+    for (Operator<?> prunerSink : procCtx.pruningSinkSet) {
+      utils.processPartitionPruningSink(procCtx, (SparkPartitionPruningSinkOperator) prunerSink);
+    }
+
+    PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
+  }
+
+  private void generateTaskTreeHelper(GenSparkProcContext procCtx, List<Node> topNodes)
+    throws SemanticException {
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack. The dispatcher generates the plan from the operator tree
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils());
+
     opRules.put(new RuleRegExp("Split Work - ReduceSink",
         ReduceSinkOperator.getOperatorName() + "%"), genSparkWork);
 
+    opRules.put(new RuleRegExp("Split Work - SparkPartitionPruningSink",
+        SparkPartitionPruningSinkOperator.getOperatorName() + "%"), genSparkWork);
+
     opRules.put(new TypeRule(MapJoinOperator.class), new SparkReduceSinkMapJoinProc());
 
     opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink",
@@ -186,8 +306,10 @@ public class SparkCompiler extends TaskCompiler {
      *                         SMBJoinOP
      *
      * Some of the other processors are expecting only one traversal beyond SMBJoinOp.
-     * We need to traverse from the big-table path only, and stop traversing on the small-table path once we reach SMBJoinOp.
-     * Also add some SMB join information to the context, so we can properly annotate the MapWork later on.
+     * We need to traverse from the big-table path only, and stop traversing on the
+     * small-table path once we reach SMBJoinOp.
+     * Also add some SMB join information to the context, so we can properly annotate
+     * the MapWork later on.
      */
     opRules.put(new TypeRule(SMBMapJoinOperator.class),
       new NodeProcessor() {
@@ -219,25 +341,8 @@ public class SparkCompiler extends TaskCompiler {
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
-    List<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pCtx.getTopOps().values());
     GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
-
-    // we need to clone some operator plans and remove union operators still
-    for (BaseWork w: procCtx.workWithUnionOperators) {
-      GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);
-    }
-
-    // we need to fill MapWork with 'local' work and bucket information for SMB Join.
-    GenSparkUtils.getUtils().annotateMapWork(procCtx);
-
-    // finally make sure the file sink operators are set up right
-    for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
-      GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
-    }
-
-    PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
new file mode 100644
index 0000000..20432c7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+
+/**
+ * This operator gets partition info from the upstream operators, and write them
+ * to HDFS. This will later be read at the driver, and used for pruning the partitions
+ * for the big table side.
+ */
+public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPruningSinkDesc> {
+
+  @SuppressWarnings("deprecation")
+  protected transient Serializer serializer;
+  protected transient DataOutputBuffer buffer;
+  protected static final Log LOG = LogFactory.getLog(SparkPartitionPruningSinkOperator.class);
+
+  @SuppressWarnings("deprecation")
+  public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
+    serializer = (Serializer) ReflectionUtils.newInstance(
+        conf.getTable().getDeserializerClass(), null);
+    buffer = new DataOutputBuffer();
+    return result;
+  }
+
+  @Override
+  public void process(Object row, int tag) throws HiveException {
+    ObjectInspector rowInspector = inputObjInspectors[0];
+    try {
+      Writable writableRow = serializer.serialize(row, rowInspector);
+      writableRow.write(buffer);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    if (!abort) {
+      try {
+        flushToFile();
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+    }
+  }
+
+  private void flushToFile() throws IOException {
+    // write an intermediate file to the specified path
+    // the format of the path is: tmpPath/targetWorkId/sourceWorkId/randInt
+    Path path = conf.getPath();
+    FileSystem fs = path.getFileSystem(this.getConfiguration());
+    fs.mkdirs(path);
+
+    while (true) {
+      path = new Path(path, String.valueOf(Utilities.randGen.nextInt()));
+      if (!fs.exists(path)) {
+        break;
+      }
+    }
+
+    short numOfRepl = fs.getDefaultReplication(path);
+
+    ObjectOutputStream out = null;
+    FSDataOutputStream fsout = null;
+
+    try {
+      fsout = fs.create(path, numOfRepl);
+      out = new ObjectOutputStream(new BufferedOutputStream(fsout, 4096));
+      out.writeUTF(conf.getTargetColumnName());
+      buffer.writeTo(out);
+    } catch (Exception e) {
+      try {
+        fs.delete(path, false);
+      } catch (Exception ex) {
+        LOG.warn("Exception happened while trying to clean partial file.");
+      }
+      throw e;
+    } finally {
+      if (out != null) {
+        LOG.info("Flushed to file: " + path);
+        out.close();
+      } else if (fsout != null) {
+        fsout.close();
+      }
+    }
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.SPARKPRUNINGSINK;
+  }
+
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  public static String getOperatorName() {
+    return "SPARKPRUNINGSINK";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
new file mode 100644
index 0000000..c140f67
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * This processor triggers on SparkPartitionPruningSinkOperator. For a operator tree like
+ * this:
+ *
+ * Original Tree:
+ *     TS    TS
+ *      |     |
+ *     FIL   FIL
+ *      |     | \
+ *     RS     RS SEL
+ *       \   /    |
+ *        JOIN   GBY
+ *                |
+ *               SPARKPRUNINGSINK
+ *
+ * It removes the branch containing SPARKPRUNINGSINK from the original operator tree, and splits it into
+ * two separate trees:
+ *
+ * Tree #1:                 Tree #2:
+ *     TS    TS               TS
+ *      |     |                |
+ *     FIL   FIL              FIL
+ *      |     |                |
+ *     RS     RS              SEL
+ *       \   /                 |
+ *       JOIN                 GBY
+ *                             |
+ *                            SPARKPRUNINGSINK
+ *
+ * For MapJoinOperator, this optimizer will not do anything - it should be executed within
+ * the same SparkTask.
+ */
+public class SplitOpTreeForDPP implements NodeProcessor {
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                        Object... nodeOutputs) throws SemanticException {
+    SparkPartitionPruningSinkOperator pruningSinkOp = (SparkPartitionPruningSinkOperator) nd;
+    GenSparkProcContext context = (GenSparkProcContext) procCtx;
+
+    // Locate the op where the branch starts
+    // This is guaranteed to succeed since the branch always follow the pattern
+    // as shown in the first picture above.
+    Operator<?> filterOp = pruningSinkOp;
+    Operator<?> selOp = null;
+    while (filterOp != null) {
+      if (filterOp.getNumChild() > 1) {
+        break;
+      } else {
+        selOp = filterOp;
+        filterOp = filterOp.getParentOperators().get(0);
+      }
+    }
+
+    // Check if this is a MapJoin. If so, do not split.
+    for (Operator<?> childOp : filterOp.getChildOperators()) {
+      if (childOp instanceof ReduceSinkOperator &&
+          childOp.getChildOperators().get(0) instanceof MapJoinOperator) {
+        context.pruningSinkSet.add(pruningSinkOp);
+        return null;
+      }
+    }
+
+    List<Operator<?>> roots = new LinkedList<Operator<?>>();
+    collectRoots(roots, pruningSinkOp);
+
+    List<Operator<?>> savedChildOps = filterOp.getChildOperators();
+    filterOp.setChildOperators(Utilities.makeList(selOp));
+
+    // Now clone the tree above selOp
+    List<Operator<?>> newRoots = Utilities.cloneOperatorTree(context.parseContext.getConf(), roots);
+    for (int i = 0; i < roots.size(); i++) {
+      TableScanOperator newTs = (TableScanOperator) newRoots.get(i);
+      TableScanOperator oldTs = (TableScanOperator) roots.get(i);
+      newTs.getConf().setTableMetadata(oldTs.getConf().getTableMetadata());
+    }
+    context.clonedPruningTableScanSet.addAll(newRoots);
+
+    // Restore broken links between operators, and remove the branch from the original tree
+    filterOp.setChildOperators(savedChildOps);
+    filterOp.removeChild(selOp);
+
+    // Find the cloned PruningSink and add it to pruningSinkSet
+    Set<Operator<?>> sinkSet = new HashSet<Operator<?>>();
+    for (Operator<?> root : newRoots) {
+      SparkUtilities.collectOp(sinkSet, root, SparkPartitionPruningSinkOperator.class);
+    }
+    Preconditions.checkArgument(sinkSet.size() == 1,
+        "AssertionError: expected to only contain one SparkPartitionPruningSinkOperator," +
+            " but found " + sinkSet.size());
+    SparkPartitionPruningSinkOperator clonedPruningSinkOp =
+        (SparkPartitionPruningSinkOperator) sinkSet.iterator().next();
+    clonedPruningSinkOp.getConf().setTableScan(pruningSinkOp.getConf().getTableScan());
+    context.pruningSinkSet.add(clonedPruningSinkOp);
+
+    return null;
+  }
+
+  /**
+   * Recursively collect all roots (e.g., table scans) that can be reached via this op.
+   * @param result contains all roots can be reached via op
+   * @param op the op to examine.
+   */
+  private void collectRoots(List<Operator<?>> result, Operator<?> op) {
+    if (op.getNumParent() == 0) {
+      result.add(op);
+    } else {
+      for (Operator<?> parentOp : op.getParentOperators()) {
+        collectRoots(result, parentOp);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 3217df2..2cb9257 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -87,6 +87,8 @@ public class MapWork extends BaseWork {
 
   private Path tmpHDFSPath;
 
+  private Path tmpPathForPartitionPruning;
+
   private String inputformat;
 
   private String indexIntermediateFile;
@@ -455,6 +457,14 @@ public class MapWork extends BaseWork {
     this.tmpHDFSPath = tmpHDFSPath;
   }
 
+  public Path getTmpPathForPartitionPruning() {
+    return this.tmpPathForPartitionPruning;
+  }
+
+  public void setTmpPathForPartitionPruning(Path tmpPathForPartitionPruning) {
+    this.tmpPathForPartitionPruning = tmpPathForPartitionPruning;
+  }
+
   public void mergingInto(MapWork mapWork) {
     // currently, this is sole field affecting mergee task
     mapWork.useBucketizedHiveInputFormat |= useBucketizedHiveInputFormat;

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 9e9a2a2..d27f3ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -103,7 +103,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
 
   private transient TableSample tableSample;
 
-  private transient final Table tableMetadata;
+  private transient Table tableMetadata;
 
   public TableScanDesc() {
     this(null, null);
@@ -284,6 +284,10 @@ public class TableScanDesc extends AbstractOperatorDesc {
     return tableMetadata;
   }
 
+  public void setTableMetadata(Table tableMetadata) {
+    this.tableMetadata = tableMetadata;
+  }
+
   public TableSample getTableSample() {
     return tableSample;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
index 363e49e..32af813 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
@@ -67,8 +67,18 @@ public class SyntheticJoinPredicate implements Transform {
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
 
-    if (!pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
-        || !pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+    boolean enabled = false;
+    String queryEngine = pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE);
+
+    if (queryEngine.equals("tez")
+        && pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+      enabled = true;
+    } else if ((queryEngine.equals("spark")
+        && pctx.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING))) {
+      enabled = true;
+    }
+
+    if (!enabled) {
       return pctx;
     }