You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/05/07 03:38:54 UTC

[2/3] hive git commit: Revert "HIVE-10397: LLAP: Implement Tez SplitSizeEstimator for Orc (Prasanth Jayachandran)"

Revert "HIVE-10397: LLAP: Implement Tez SplitSizeEstimator for Orc (Prasanth Jayachandran)"

This reverts commit bfc3afb8a6031d21ae12a91faf93664235e8a141.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f16993d2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f16993d2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f16993d2

Branch: refs/heads/llap
Commit: f16993d2d8233b01b605dbfbe51119f2fde0c887
Parents: d87a22a
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Wed May 6 18:27:48 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Wed May 6 18:27:48 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/tez/CustomPartitionVertex.java | 10 ++--
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |  7 ++-
 .../hadoop/hive/ql/exec/tez/SplitGrouper.java   | 55 ++++++--------------
 .../apache/hadoop/hive/ql/io/ColumnarSplit.java | 33 ------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 33 ++----------
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  | 11 ++--
 6 files changed, 33 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index 57d9449..5f7b20b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -47,9 +48,9 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -68,7 +69,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
@@ -275,7 +275,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
           InputSplit[] inputSplitArray =
               (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
           Multimap<Integer, InputSplit> groupedSplit =
-              grouper.generateGroupedSplits(jobConf, inputSplitArray, waves,
+              grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
                   availableSlots, inputName, mainWorkName.isEmpty());
           if (mainWorkName.isEmpty() == false) {
             Multimap<Integer, InputSplit> singleBucketToGroupedSplit =
@@ -283,7 +283,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
             singleBucketToGroupedSplit.putAll(key, groupedSplit.values());
             groupedSplit =
                 grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots,
-                    HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES), null);
+                    HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES));
             secondLevelGroupingDone = true;
           }
           bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
@@ -297,7 +297,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
           InputSplit[] inputSplitArray =
               (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
           Multimap<Integer, InputSplit> groupedSplit =
-              grouper.generateGroupedSplits(jobConf, inputSplitArray, waves,
+              grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
                     availableSlots, inputName, false);
             bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index d3007ad..5fb6052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +56,6 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 
@@ -139,9 +139,8 @@ public class HiveSplitGenerator extends InputInitializer {
         LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
             + " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
 
-        Multimap<Integer, InputSplit> groupedSplits = splitGrouper.generateGroupedSplits(jobConf,
-            splits, waves, availableSlots);
-
+        Multimap<Integer, InputSplit> groupedSplits =
+            splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
         // And finally return them in a flat array
         InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
         LOG.info("Number of grouped splits: " + flatSplits.length);

http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index e819b27..c12da37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -39,10 +40,8 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.split.SplitSizeEstimator;
 import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -73,8 +72,7 @@ public class SplitGrouper {
    * available slots with tasks
    */
   public Multimap<Integer, InputSplit> group(Configuration conf,
-      Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves,
-      Map<Integer, SplitSizeEstimator> splitSizeEstimatorMap)
+      Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves)
       throws IOException {
 
     // figure out how many tasks we want for each bucket
@@ -88,16 +86,14 @@ public class SplitGrouper {
     // use the tez grouper to combine splits once per bucket
     for (int bucketId : bucketSplitMultimap.keySet()) {
       Collection<InputSplit> inputSplitCollection = bucketSplitMultimap.get(bucketId);
-      SplitSizeEstimator splitSizeEstimator =
-          splitSizeEstimatorMap == null ? null : splitSizeEstimatorMap.get(bucketId);
+
       InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]);
       InputSplit[] groupedSplits =
           tezGrouper.getGroupedSplits(conf, rawSplits, bucketTaskMap.get(bucketId),
-              HiveInputFormat.class.getName(), splitSizeEstimator);
+              HiveInputFormat.class.getName());
 
       LOG.info("Original split size is " + rawSplits.length + " grouped split size is "
-          + groupedSplits.length + ", for bucket: " + bucketId + " SplitSizeEstimator: " +
-          splitSizeEstimator.getClass().getSimpleName());
+          + groupedSplits.length + ", for bucket: " + bucketId);
 
       for (InputSplit inSplit : groupedSplits) {
         bucketGroupedSplitMultimap.put(bucketId, inSplit);
@@ -157,63 +153,46 @@ public class SplitGrouper {
 
   /** Generate groups of splits, separated by schema evolution boundaries */
   public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
-      InputSplit[] splits,
-      float waves, int availableSlots)
+                                                                    Configuration conf,
+                                                                    InputSplit[] splits,
+                                                                    float waves, int availableSlots)
       throws Exception {
-    return generateGroupedSplits(jobConf, splits, waves, availableSlots, null, true);
+    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
   }
 
   /** Generate groups of splits, separated by schema evolution boundaries */
   public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
-      InputSplit[] splits,
-      float waves, int availableSlots,
-      String inputName,
-      boolean groupAcrossFiles) throws Exception {
+                                                                    Configuration conf,
+                                                                    InputSplit[] splits,
+                                                                    float waves, int availableSlots,
+                                                                    String inputName,
+                                                                    boolean groupAcrossFiles) throws
+      Exception {
 
     MapWork work = populateMapWork(jobConf, inputName);
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();
-    Map<Integer, SplitSizeEstimator> splitSizeEstimatorMap = new HashMap<>();
+
     int i = 0;
     InputSplit prevSplit = null;
-    Class<? extends InputFormat> inputFormatClass = null;
     for (InputSplit s : splits) {
       // this is the bit where we make sure we don't group across partition
       // schema boundaries
       if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
         ++i;
         prevSplit = s;
-        inputFormatClass = getInputFormatClassFromSplit(s, work);
-        InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf);
-        if (inputFormat instanceof SplitSizeEstimator) {
-          LOG.info(inputFormat.getClass().getSimpleName() + " implements SplitSizeEstimator");
-          splitSizeEstimatorMap.put(i, (SplitSizeEstimator) inputFormat);
-        } else {
-          LOG.info(inputFormat.getClass().getSimpleName() + " does not implement SplitSizeEstimator");
-          splitSizeEstimatorMap.put(i, null);
-        }
       }
-
       bucketSplitMultiMap.put(i, s);
     }
     LOG.info("# Src groups for split generation: " + (i + 1));
 
     // group them into the chunks we want
     Multimap<Integer, InputSplit> groupedSplits =
-        this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, splitSizeEstimatorMap);
+        this.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
 
     return groupedSplits;
   }
 
-  private Class<? extends InputFormat> getInputFormatClassFromSplit(InputSplit s, MapWork work)
-      throws IOException {
-    Path path = ((FileSplit) s).getPath();
-    PartitionDesc pd =
-        HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
-            path, cache);
-    return pd.getInputFileFormatClass();
-  }
-
 
   /**
    * get the size estimates for each bucket in tasks. This is used to make sure

http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java
deleted file mode 100644
index 8fd10e5..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io;
-
-/**
- * ColumnarSplit onterface when implemented should return the estimated size of columnar projections
- * that will be read from the split. This information will be used by split grouper for better
- * grouping based on the actual data read instead of the complete split length.
- */
-public interface ColumnarSplit {
-
-  /**
-   * Return the estimation size of the column projections that will be read from this split.
-   *
-   * @return - estimated column projection size that will be read in bytes
-   */
-  long getColumnarProjectionSize();
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 7d136e9..2548106 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -50,9 +50,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.ColumnarSplit;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -76,7 +74,6 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.split.SplitSizeEstimator;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.cache.Cache;
@@ -108,10 +105,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface, LlapWrappableInputFormatInterface,
-    AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination,
-    SplitSizeEstimator {
+    AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
 
-  enum SplitStrategyKind{
+  static enum SplitStrategyKind{
     HYBRID,
     BI,
     ETL
@@ -1057,7 +1053,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     if (isDebugEnabled) {
       for (OrcSplit split : splits) {
         LOG.debug(split + " projected_columns_uncompressed_size: "
-            + split.getColumnarProjectionSize());
+            + split.getProjectedColumnsUncompressedSize());
       }
     }
     return splits;
@@ -1070,29 +1066,6 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   }
 
   @Override
-  public long getEstimatedSize(InputSplit inputSplit) throws IOException {
-    long colProjSize = inputSplit.getLength();
-
-    if (inputSplit instanceof ColumnarSplit) {
-      colProjSize = ((ColumnarSplit) inputSplit).getColumnarProjectionSize();
-      if (isDebugEnabled) {
-        LOG.debug("Estimated column projection size: " + colProjSize);
-      }
-      return colProjSize;
-    } else if (inputSplit instanceof HiveInputFormat.HiveInputSplit) {
-      InputSplit innerInputSplit = ((HiveInputFormat.HiveInputSplit) inputSplit).getInputSplit();
-      if (innerInputSplit instanceof ColumnarSplit) {
-        colProjSize = ((ColumnarSplit) innerInputSplit).getColumnarProjectionSize();
-      }
-      if (isDebugEnabled) {
-        LOG.debug("Estimated column projection size: " + colProjSize);
-      }
-    }
-
-    return colProjSize;
-  }
-
-  @Override
   public InputSplit[] getSplits(JobConf job,
                                 int numSplits) throws IOException {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);

http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 836c113..1263346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.ColumnarSplit;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.FileSplit;
@@ -39,7 +38,7 @@ import org.apache.hadoop.mapred.FileSplit;
  * OrcFileSplit. Holds file meta info
  *
  */
-public class OrcSplit extends FileSplit implements ColumnarSplit {
+public class OrcSplit extends FileSplit {
   private static final Log LOG = LogFactory.getLog(OrcSplit.class);
 
   private FileMetaInfo fileMetaInfo;
@@ -56,7 +55,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit {
   static final int ORIGINAL_FLAG = 2;
   static final int FOOTER_FLAG = 1;
 
-  protected OrcSplit() {
+  protected OrcSplit(){
     //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
     //This constructor is used to create the object and then call readFields()
     // so just pass nulls to this super constructor.
@@ -75,7 +74,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit {
     this.isOriginal = isOriginal;
     this.hasBase = hasBase;
     this.deltas.addAll(deltas);
-    this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize;
+    this.projColsUncompressedSize = projectedDataSize;
   }
 
   @Override
@@ -174,8 +173,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit {
     return fileId;
   }
 
-  @Override
-  public long getColumnarProjectionSize() {
+  public long getProjectedColumnsUncompressedSize() {
     return projColsUncompressedSize;
   }
+
 }