You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/04/07 02:16:16 UTC
svn commit: r931375 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/plan/
Author: zshao
Date: Wed Apr 7 00:16:16 2010
New Revision: 931375
URL: http://svn.apache.org/viewvc?rev=931375&view=rev
Log:
HIVE-1280. Add option to CombineHiveInputFormat for non-splittable inputs. (Namit Jain via zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Apr 7 00:16:16 2010
@@ -345,6 +345,9 @@ Trunk - Unreleased
HIVE-1290. Sort merge join does not work with bucketizedhiveinputformat.
(Namit Jain via Ning Zhang)
+ HIVE-1280. Add option to CombineHiveInputFormat for non-splittable inputs.
+ (Namit Jain via zshao)
+
Release 0.5.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Apr 7 00:16:16 2010
@@ -219,16 +219,18 @@ public class HiveConf extends Configurat
HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
HIVEENFORCESORTING("hive.enforce.sorting", false),
HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
-
+
HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
+ HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE("hive.hadoop.supports.splittable.combineinputformat", false),
+
// Optimizer
HIVEOPTCP("hive.optimize.cp", true), // column pruner
HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
- HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+ HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
;
public final String varname;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Wed Apr 7 00:16:16 2010
@@ -26,6 +26,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Map;
+import java.util.Queue;
+import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,7 +51,6 @@ import org.apache.hadoop.mapred.RecordRe
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
-
/**
* CombineHiveInputFormat is a parameterized InputFormat which looks at the path
* name and determine the correct InputFormat for that path name from
@@ -239,37 +240,50 @@ public class CombineHiveInputFormat<K ex
for (Path path : paths) {
LOG.info("CombineHiveInputSplit creating pool for " + path);
- // The following code should be removed, once
- // https://issues.apache.org/jira/browse/MAPREDUCE-1597 is fixed.
- // Hadoop does not handle non-splitable files correctly for CombineFileInputFormat,
- // so don't use CombineFileInputFormat for non-splittable files
- FileSystem inpFs = path.getFileSystem(job);
-
- FileStatus fStats = inpFs.getFileStatus(path);
- Path tstPath = path;
-
- // If path is a directory
- if (fStats.isDir()) {
- FileStatus[] fStatus = inpFs.listStatus(path);
- if (fStatus.length > 0) {
- tstPath = fStatus[0].getPath();
- }
- }
-
PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, path);
+ TableDesc tableDesc = part.getTableDesc();
+ if ((tableDesc != null) && tableDesc.isNonNative()) {
+ return super.getSplits(job, numSplits);
+ }
// Use HiveInputFormat if any of the paths is not splittable
Class inputFormatClass = part.getInputFileFormatClass();
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
- TableDesc tableDesc = part.getTableDesc();
- if ((tableDesc != null) && tableDesc.isNonNative()) {
- return super.getSplits(job, numSplits);
- }
+ // Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not,
+ // we use a configuration variable for the same
+ if (this.mrwork != null && this.mrwork.getHadoopSupportsSplittable()) {
+ // The following code should be removed, once
+ // https://issues.apache.org/jira/browse/MAPREDUCE-1597 is fixed.
+ // Hadoop does not handle non-splittable files correctly for CombineFileInputFormat,
+ // so don't use CombineFileInputFormat for non-splittable files
+ FileSystem inpFs = path.getFileSystem(job);
+
+ if (inputFormat instanceof TextInputFormat) {
+ Queue<Path> dirs = new LinkedList<Path>();
+ FileStatus fStats = inpFs.getFileStatus(path);
+
+ // If path is a directory
+ if (fStats.isDir()) {
+ dirs.offer(path);
+ }
+ else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
+ return super.getSplits(job, numSplits);
+ }
- if ((inputFormat instanceof TextInputFormat) &&
- ((new CompressionCodecFactory(job)).getCodec(tstPath) != null)) {
- return super.getSplits(job, numSplits);
+ while (dirs.peek() != null) {
+ Path tstPath = dirs.remove();
+ FileStatus[] fStatus = inpFs.listStatus(tstPath);
+ for (int idx = 0; idx < fStatus.length; idx++) {
+ if (fStatus[idx].isDir()) {
+ dirs.offer(fStatus[idx].getPath());
+ }
+ else if ((new CompressionCodecFactory(job)).getCodec(fStatus[idx].getPath()) != null) {
+ return super.getSplits(job, numSplits);
+ }
+ }
+ }
+ }
}
if (inputFormat instanceof SymlinkTextInputFormat) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Apr 7 00:16:16 2010
@@ -77,7 +77,7 @@ public class GenMRFileSink1 implements N
/**
* File Sink Operator encountered.
- *
+ *
* @param nd
* the file sink operator encountered
* @param opProcCtx
@@ -158,8 +158,8 @@ public class GenMRFileSink1 implements N
new ArrayList<ExprNodeDesc>(), valueCols, outputColumns, false, -1, -1,
-1);
OperatorFactory.getAndMakeChild(rsDesc, fsRS, ts_op);
- MapredWork cplan = GenMapRedUtils.getMapRedWork();
ParseContext parseCtx = ctx.getParseCtx();
+ MapredWork cplan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
Task<? extends Serializable> mergeTask = TaskFactory.get(cplan, parseCtx
.getConf());
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Wed Apr 7 00:16:16 2010
@@ -42,7 +42,7 @@ public class GenMRTableScan1 implements
/**
* Table Sink encountered.
- *
+ *
* @param nd
* the table sink operator encountered
* @param opProcCtx
@@ -57,8 +57,9 @@ public class GenMRTableScan1 implements
.getMapCurrCtx();
// create a dummy task
- Task<? extends Serializable> currTask = TaskFactory.get(GenMapRedUtils
- .getMapRedWork(), parseCtx.getConf());
+ Task<? extends Serializable> currTask =
+ TaskFactory.get(GenMapRedUtils.getMapRedWork(parseCtx.getConf()),
+ parseCtx.getConf());
Operator<? extends Serializable> currTopOp = op;
ctx.setCurrTask(currTask);
ctx.setCurrTopOp(currTopOp);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Wed Apr 7 00:16:16 2010
@@ -65,9 +65,9 @@ public class GenMRUnion1 implements Node
* all the sub-queries are map-only, dont do anything. However, if there is a
* mapjoin followed by the union, merge at the union Otherwise, insert a
* FileSink on top of all the sub-queries.
- *
+ *
* This can be optimized later on.
- *
+ *
* @param nd
* the file sink operator encountered
* @param opProcCtx
@@ -132,7 +132,7 @@ public class GenMRUnion1 implements Node
// union is encountered for the first time
if (uCtxTask == null) {
uCtxTask = new GenMRUnionCtx();
- uPlan = GenMapRedUtils.getMapRedWork();
+ uPlan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
uTask = TaskFactory.get(uPlan, parseCtx.getConf());
uCtxTask.setUTask(uTask);
ctx.setUnionTask(union, uCtxTask);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Apr 7 00:16:16 2010
@@ -471,8 +471,8 @@ public final class GenMapRedUtils {
public static void splitPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
throws SemanticException {
// Generate a new task
- MapredWork cplan = getMapRedWork();
ParseContext parseCtx = opProcCtx.getParseCtx();
+ MapredWork cplan = getMapRedWork(parseCtx.getConf());
Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
.getConf());
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
@@ -736,14 +736,15 @@ public final class GenMapRedUtils {
*
* @return the new plan
*/
- public static MapredWork getMapRedWork() {
+ public static MapredWork getMapRedWork(HiveConf conf) {
MapredWork work = new MapredWork();
work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
- work
- .setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
+ work.setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
work.setTagToValueDesc(new ArrayList<TableDesc>());
work.setReducer(null);
+ work.setHadoopSupportsSplittable(
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE));
return work;
}
@@ -913,7 +914,7 @@ public final class GenMapRedUtils {
// union is encountered for the first time
if (uCtxTask == null) {
uCtxTask = new GenMRUnionCtx();
- uPlan = GenMapRedUtils.getMapRedWork();
+ uPlan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
uTask = TaskFactory.get(uPlan, parseCtx.getConf());
uCtxTask.setUTask(uTask);
ctx.setUnionTask(union, uCtxTask);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Wed Apr 7 00:16:16 2010
@@ -126,8 +126,8 @@ public final class MapJoinFactory {
AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
GenMRProcContext opProcCtx = (GenMRProcContext) procCtx;
- MapredWork cplan = GenMapRedUtils.getMapRedWork();
ParseContext parseCtx = opProcCtx.getParseCtx();
+ MapredWork cplan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
.getConf());
Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
@@ -211,7 +211,7 @@ public final class MapJoinFactory {
ctx.setMapJoinCtx(mapJoin, mjCtx);
}
- MapredWork mjPlan = GenMapRedUtils.getMapRedWork();
+ MapredWork mjPlan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx
.getConf());
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Wed Apr 7 00:16:16 2010
@@ -61,6 +61,8 @@ public class MapredWork implements Seria
private Integer minSplitSize;
private boolean needsTagging;
+ private boolean hadoopSupportsSplittable;
+
private MapredLocalWork mapLocalWork;
private String inputformat;
@@ -75,7 +77,8 @@ public class MapredWork implements Seria
final LinkedHashMap<String, Operator<? extends Serializable>> aliasToWork,
final TableDesc keyDesc, List<TableDesc> tagToValueDesc,
final Operator<?> reducer, final Integer numReduceTasks,
- final MapredLocalWork mapLocalWork) {
+ final MapredLocalWork mapLocalWork,
+ final boolean hadoopSupportsSplittable) {
this.command = command;
this.pathToAliases = pathToAliases;
this.pathToPartitionInfo = pathToPartitionInfo;
@@ -86,6 +89,7 @@ public class MapredWork implements Seria
this.numReduceTasks = numReduceTasks;
this.mapLocalWork = mapLocalWork;
aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
+ this.hadoopSupportsSplittable = hadoopSupportsSplittable;
}
public String getCommand() {
@@ -194,7 +198,7 @@ public class MapredWork implements Seria
/**
* If the number of reducers is -1, the runtime will automatically figure it
* out by input data size.
- *
+ *
* The number of reducers will be a positive number only in case the target
* table is bucketed into N buckets (through CREATE TABLE). This feature is
* not supported yet, so the number of reducers will always be -1 for now.
@@ -294,6 +298,14 @@ public class MapredWork implements Seria
this.needsTagging = needsTagging;
}
+ public boolean getHadoopSupportsSplittable() {
+ return hadoopSupportsSplittable;
+ }
+
+ public void setHadoopSupportsSplittable(boolean hadoopSupportsSplittable) {
+ this.hadoopSupportsSplittable = hadoopSupportsSplittable;
+ }
+
public Integer getMinSplitSize() {
return minSplitSize;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Wed Apr 7 00:16:16 2010
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.InputFor
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.hive.conf.HiveConf;
/**
* PlanUtils.
@@ -72,11 +73,16 @@ public final class PlanUtils {
@SuppressWarnings("nls")
public static MapredWork getMapRedWork() {
- return new MapredWork("", new LinkedHashMap<String, ArrayList<String>>(),
+ try {
+ return new MapredWork("", new LinkedHashMap<String, ArrayList<String>>(),
new LinkedHashMap<String, PartitionDesc>(),
new LinkedHashMap<String, Operator<? extends Serializable>>(),
new TableDesc(), new ArrayList<TableDesc>(), null, Integer.valueOf(1),
- null);
+ null, Hive.get().getConf().getBoolVar(
+ HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE));
+ } catch (HiveException ex) {
+ throw new RuntimeException(ex);
+ }
}
/**
@@ -389,7 +395,7 @@ public final class PlanUtils {
/**
* Create the reduce sink descriptor.
- *
+ *
* @param keyCols
* The columns to be stored in the key
* @param valueCols
@@ -441,7 +447,7 @@ public final class PlanUtils {
/**
* Create the reduce sink descriptor.
- *
+ *
* @param keyCols
* The columns to be stored in the key
* @param valueCols