You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/23 18:51:49 UTC

[09/13] hive git commit: HIVE-20056: SparkPartitionPruner shouldn't be triggered by Spark tasks (Sahil Takiar, reviewed by Rui Li)

HIVE-20056: SparkPartitionPruner shouldn't be triggered by Spark tasks (Sahil Takiar, reviewed by Rui Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bed17e54
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bed17e54
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bed17e54

Branch: refs/heads/master-txnstats
Commit: bed17e54d8ab465256e41f2b796d69d21a075ea0
Parents: 6b15816
Author: Sahil Takiar <ta...@gmail.com>
Authored: Sat Jul 14 12:51:01 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Mon Jul 23 09:12:55 2018 -0500

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/log/PerfLogger.java   |  2 ++
 .../exec/spark/SparkDynamicPartitionPruner.java | 25 ++++++++++++--------
 .../hive/ql/exec/spark/SparkPlanGenerator.java  | 24 +++++++++++++++++++
 .../hadoop/hive/ql/io/HiveInputFormat.java      | 12 ----------
 4 files changed, 41 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bed17e54/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 111e614..0ee41c0 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -85,6 +85,8 @@ public class PerfLogger {
   public static final String SPARK_OPTIMIZE_OPERATOR_TREE = "SparkOptimizeOperatorTree";
   public static final String SPARK_OPTIMIZE_TASK_TREE = "SparkOptimizeTaskTree";
   public static final String SPARK_FLUSH_HASHTABLE = "SparkFlushHashTable.";
+  public static final String SPARK_DYNAMICALLY_PRUNE_PARTITIONS =
+          "SparkDynamicallyPrunePartitions.";
 
   public static final String FILE_MOVES = "FileMoves";
   public static final String LOAD_TABLE = "LoadTable";

http://git-wip-us.apache.org/repos/asf/hive/blob/bed17e54/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
index 240fa09..b9285ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
@@ -30,9 +30,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Preconditions;
-import javolution.testing.AssertionException;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -63,7 +64,11 @@ import org.apache.hadoop.util.ReflectionUtils;
  * The spark version of DynamicPartitionPruner.
  */
 public class SparkDynamicPartitionPruner {
+
   private static final Logger LOG = LoggerFactory.getLogger(SparkDynamicPartitionPruner.class);
+  private static final String CLASS_NAME = SparkDynamicPartitionPruner.class.getName();
+
+  private final PerfLogger perfLogger = SessionState.getPerfLogger();
   private final Map<String, List<SourceInfo>> sourceInfoMap = new LinkedHashMap<String, List<SourceInfo>>();
   private final BytesWritable writable = new BytesWritable();
 
@@ -74,8 +79,12 @@ public class SparkDynamicPartitionPruner {
       // Nothing to prune for this MapWork
       return;
     }
+    perfLogger.PerfLogBegin(CLASS_NAME,
+            PerfLogger.SPARK_DYNAMICALLY_PRUNE_PARTITIONS + work.getName());
     processFiles(work, jobConf);
     prunePartitions(work);
+    perfLogger.PerfLogBegin(CLASS_NAME,
+            PerfLogger.SPARK_DYNAMICALLY_PRUNE_PARTITIONS + work.getName());
   }
 
   public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
@@ -210,14 +219,11 @@ public class SparkDynamicPartitionPruner {
       Path p = it.next();
       PartitionDesc desc = work.getPathToPartitionInfo().get(p);
       Map<String, String> spec = desc.getPartSpec();
-      if (spec == null) {
-        throw new AssertionException("No partition spec found in dynamic pruning");
-      }
+      Preconditions.checkNotNull(spec, "No partition spec found in dynamic pruning");
 
       String partValueString = spec.get(columnName);
-      if (partValueString == null) {
-        throw new AssertionException("Could not find partition value for column: " + columnName);
-      }
+      Preconditions.checkNotNull(partValueString,
+              "Could not find partition value for column: " + columnName);
 
       Object partValue = converter.convert(partValueString);
       if (LOG.isDebugEnabled()) {
@@ -234,8 +240,7 @@ public class SparkDynamicPartitionPruner {
         LOG.info("Pruning path: " + p);
         it.remove();
         work.removePathToAlias(p);
-        // HIVE-12244 call currently ineffective
-        work.getPartitionDescs().remove(desc);
+        work.removePathToPartitionInfo(p);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/bed17e54/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index d71d705..001d0b0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -109,6 +109,9 @@ public class SparkPlanGenerator {
 
     try {
       for (BaseWork work : sparkWork.getAllWork()) {
+        // Run the SparkDynamicPartitionPruner, we run this here instead of inside the
+        // InputFormat so that we don't have to run pruning when creating a Record Reader
+        runDynamicPartitionPruner(work);
         perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName());
         SparkTran tran = generate(work, sparkWork);
         SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work);
@@ -127,6 +130,27 @@ public class SparkPlanGenerator {
     return sparkPlan;
   }
 
+  /**
+   * Run a {@link SparkDynamicPartitionPruner} on the given {@link BaseWork}. This method only
+   * runs the pruner if the work object is a {@link MapWork}. We do this here because we need to
+   * do it after all previous Spark jobs for the given query have completed, otherwise the input
+   * file for the pruner won't exist. We need to make sure this runs before we serialize the
+   * given work object to a file (so it can be read by individual tasks) because the pruner will
+   * mutate the work work object by removing certain input paths.
+   *
+   * @param work the {@link BaseWork} to run the pruner on
+   */
+  private void runDynamicPartitionPruner(BaseWork work) {
+    if (work instanceof MapWork && HiveConf.isSparkDPPAny(jobConf)) {
+      SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner();
+      try {
+        pruner.prune((MapWork) work, jobConf);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
   // Generate (possibly get from a cached result) parent SparkTran
   private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork,
                                        BaseWork work) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/bed17e54/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 3cb7ab5..ea37daf 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -432,18 +432,6 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       } else {
         mrwork = Utilities.getMapWork(job);
       }
-
-      // Prune partitions
-      if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")
-          && HiveConf.isSparkDPPAny(job)) {
-        SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner();
-        try {
-          pruner.prune(mrwork, job);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-
       pathToPartitionInfo = mrwork.getPathToPartitionInfo();
     }
   }