You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/05/07 03:38:54 UTC
[2/3] hive git commit: Revert "HIVE-10397: LLAP: Implement Tez
SplitSizeEstimator for Orc (Prasanth Jayachandran)"
Revert "HIVE-10397: LLAP: Implement Tez SplitSizeEstimator for Orc (Prasanth Jayachandran)"
This reverts commit bfc3afb8a6031d21ae12a91faf93664235e8a141.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f16993d2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f16993d2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f16993d2
Branch: refs/heads/llap
Commit: f16993d2d8233b01b605dbfbe51119f2fde0c887
Parents: d87a22a
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Wed May 6 18:27:48 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Wed May 6 18:27:48 2015 -0700
----------------------------------------------------------------------
.../hive/ql/exec/tez/CustomPartitionVertex.java | 10 ++--
.../hive/ql/exec/tez/HiveSplitGenerator.java | 7 ++-
.../hadoop/hive/ql/exec/tez/SplitGrouper.java | 55 ++++++--------------
.../apache/hadoop/hive/ql/io/ColumnarSplit.java | 33 ------------
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 33 ++----------
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 11 ++--
6 files changed, 33 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index 57d9449..5f7b20b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -47,9 +48,9 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.split.TezGroupedSplit;
import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -68,7 +69,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -275,7 +275,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
InputSplit[] inputSplitArray =
(bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
Multimap<Integer, InputSplit> groupedSplit =
- grouper.generateGroupedSplits(jobConf, inputSplitArray, waves,
+ grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
availableSlots, inputName, mainWorkName.isEmpty());
if (mainWorkName.isEmpty() == false) {
Multimap<Integer, InputSplit> singleBucketToGroupedSplit =
@@ -283,7 +283,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
singleBucketToGroupedSplit.putAll(key, groupedSplit.values());
groupedSplit =
grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots,
- HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES), null);
+ HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES));
secondLevelGroupingDone = true;
}
bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
@@ -297,7 +297,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
InputSplit[] inputSplitArray =
(bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
Multimap<Integer, InputSplit> groupedSplit =
- grouper.generateGroupedSplits(jobConf, inputSplitArray, waves,
+ grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
availableSlots, inputName, false);
bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index d3007ad..5fb6052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -55,7 +56,6 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
@@ -139,9 +139,8 @@ public class HiveSplitGenerator extends InputInitializer {
LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
+ " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
- Multimap<Integer, InputSplit> groupedSplits = splitGrouper.generateGroupedSplits(jobConf,
- splits, waves, availableSlots);
-
+ Multimap<Integer, InputSplit> groupedSplits =
+ splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
// And finally return them in a flat array
InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
LOG.info("Number of grouped splits: " + flatSplits.length);
http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index e819b27..c12da37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -39,10 +40,8 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.split.SplitSizeEstimator;
import org.apache.hadoop.mapred.split.TezGroupedSplit;
import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -73,8 +72,7 @@ public class SplitGrouper {
* available slots with tasks
*/
public Multimap<Integer, InputSplit> group(Configuration conf,
- Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves,
- Map<Integer, SplitSizeEstimator> splitSizeEstimatorMap)
+ Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves)
throws IOException {
// figure out how many tasks we want for each bucket
@@ -88,16 +86,14 @@ public class SplitGrouper {
// use the tez grouper to combine splits once per bucket
for (int bucketId : bucketSplitMultimap.keySet()) {
Collection<InputSplit> inputSplitCollection = bucketSplitMultimap.get(bucketId);
- SplitSizeEstimator splitSizeEstimator =
- splitSizeEstimatorMap == null ? null : splitSizeEstimatorMap.get(bucketId);
+
InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]);
InputSplit[] groupedSplits =
tezGrouper.getGroupedSplits(conf, rawSplits, bucketTaskMap.get(bucketId),
- HiveInputFormat.class.getName(), splitSizeEstimator);
+ HiveInputFormat.class.getName());
LOG.info("Original split size is " + rawSplits.length + " grouped split size is "
- + groupedSplits.length + ", for bucket: " + bucketId + " SplitSizeEstimator: " +
- splitSizeEstimator.getClass().getSimpleName());
+ + groupedSplits.length + ", for bucket: " + bucketId);
for (InputSplit inSplit : groupedSplits) {
bucketGroupedSplitMultimap.put(bucketId, inSplit);
@@ -157,63 +153,46 @@ public class SplitGrouper {
/** Generate groups of splits, separated by schema evolution boundaries */
public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
- InputSplit[] splits,
- float waves, int availableSlots)
+ Configuration conf,
+ InputSplit[] splits,
+ float waves, int availableSlots)
throws Exception {
- return generateGroupedSplits(jobConf, splits, waves, availableSlots, null, true);
+ return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
}
/** Generate groups of splits, separated by schema evolution boundaries */
public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
- InputSplit[] splits,
- float waves, int availableSlots,
- String inputName,
- boolean groupAcrossFiles) throws Exception {
+ Configuration conf,
+ InputSplit[] splits,
+ float waves, int availableSlots,
+ String inputName,
+ boolean groupAcrossFiles) throws
+ Exception {
MapWork work = populateMapWork(jobConf, inputName);
Multimap<Integer, InputSplit> bucketSplitMultiMap =
ArrayListMultimap.<Integer, InputSplit> create();
- Map<Integer, SplitSizeEstimator> splitSizeEstimatorMap = new HashMap<>();
+
int i = 0;
InputSplit prevSplit = null;
- Class<? extends InputFormat> inputFormatClass = null;
for (InputSplit s : splits) {
// this is the bit where we make sure we don't group across partition
// schema boundaries
if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
++i;
prevSplit = s;
- inputFormatClass = getInputFormatClassFromSplit(s, work);
- InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf);
- if (inputFormat instanceof SplitSizeEstimator) {
- LOG.info(inputFormat.getClass().getSimpleName() + " implements SplitSizeEstimator");
- splitSizeEstimatorMap.put(i, (SplitSizeEstimator) inputFormat);
- } else {
- LOG.info(inputFormat.getClass().getSimpleName() + " does not implement SplitSizeEstimator");
- splitSizeEstimatorMap.put(i, null);
- }
}
-
bucketSplitMultiMap.put(i, s);
}
LOG.info("# Src groups for split generation: " + (i + 1));
// group them into the chunks we want
Multimap<Integer, InputSplit> groupedSplits =
- this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, splitSizeEstimatorMap);
+ this.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
return groupedSplits;
}
- private Class<? extends InputFormat> getInputFormatClassFromSplit(InputSplit s, MapWork work)
- throws IOException {
- Path path = ((FileSplit) s).getPath();
- PartitionDesc pd =
- HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
- path, cache);
- return pd.getInputFileFormatClass();
- }
-
/**
* get the size estimates for each bucket in tasks. This is used to make sure
http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java
deleted file mode 100644
index 8fd10e5..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/ColumnarSplit.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io;
-
-/**
- * ColumnarSplit onterface when implemented should return the estimated size of columnar projections
- * that will be read from the split. This information will be used by split grouper for better
- * grouping based on the actual data read instead of the complete split length.
- */
-public interface ColumnarSplit {
-
- /**
- * Return the estimation size of the column projections that will be read from this split.
- *
- * @return - estimated column projection size that will be read in bytes
- */
- long getColumnarProjectionSize();
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 7d136e9..2548106 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -50,9 +50,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.ColumnarSplit;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -76,7 +74,6 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.split.SplitSizeEstimator;
import org.apache.hadoop.util.StringUtils;
import com.google.common.cache.Cache;
@@ -108,10 +105,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/
public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
InputFormatChecker, VectorizedInputFormatInterface, LlapWrappableInputFormatInterface,
- AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination,
- SplitSizeEstimator {
+ AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
- enum SplitStrategyKind{
+ static enum SplitStrategyKind{
HYBRID,
BI,
ETL
@@ -1057,7 +1053,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (isDebugEnabled) {
for (OrcSplit split : splits) {
LOG.debug(split + " projected_columns_uncompressed_size: "
- + split.getColumnarProjectionSize());
+ + split.getProjectedColumnsUncompressedSize());
}
}
return splits;
@@ -1070,29 +1066,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
@Override
- public long getEstimatedSize(InputSplit inputSplit) throws IOException {
- long colProjSize = inputSplit.getLength();
-
- if (inputSplit instanceof ColumnarSplit) {
- colProjSize = ((ColumnarSplit) inputSplit).getColumnarProjectionSize();
- if (isDebugEnabled) {
- LOG.debug("Estimated column projection size: " + colProjSize);
- }
- return colProjSize;
- } else if (inputSplit instanceof HiveInputFormat.HiveInputSplit) {
- InputSplit innerInputSplit = ((HiveInputFormat.HiveInputSplit) inputSplit).getInputSplit();
- if (innerInputSplit instanceof ColumnarSplit) {
- colProjSize = ((ColumnarSplit) innerInputSplit).getColumnarProjectionSize();
- }
- if (isDebugEnabled) {
- LOG.debug("Estimated column projection size: " + colProjSize);
- }
- }
-
- return colProjSize;
- }
-
- @Override
public InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
http://git-wip-us.apache.org/repos/asf/hive/blob/f16993d2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 836c113..1263346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -28,7 +28,6 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.ColumnarSplit;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.FileSplit;
@@ -39,7 +38,7 @@ import org.apache.hadoop.mapred.FileSplit;
* OrcFileSplit. Holds file meta info
*
*/
-public class OrcSplit extends FileSplit implements ColumnarSplit {
+public class OrcSplit extends FileSplit {
private static final Log LOG = LogFactory.getLog(OrcSplit.class);
private FileMetaInfo fileMetaInfo;
@@ -56,7 +55,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit {
static final int ORIGINAL_FLAG = 2;
static final int FOOTER_FLAG = 1;
- protected OrcSplit() {
+ protected OrcSplit(){
//The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
//This constructor is used to create the object and then call readFields()
// so just pass nulls to this super constructor.
@@ -75,7 +74,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit {
this.isOriginal = isOriginal;
this.hasBase = hasBase;
this.deltas.addAll(deltas);
- this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize;
+ this.projColsUncompressedSize = projectedDataSize;
}
@Override
@@ -174,8 +173,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit {
return fileId;
}
- @Override
- public long getColumnarProjectionSize() {
+ public long getProjectedColumnsUncompressedSize() {
return projColsUncompressedSize;
}
+
}