You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/01/06 22:32:41 UTC
svn commit: r1556041 [3/42] - in /hive/trunk:
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Jan 6 21:32:38 2014
@@ -33,11 +33,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -52,6 +56,8 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
@@ -64,20 +70,31 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.InputFormat;
/**
* General utility common functions for the Processor to convert operator into
@@ -90,7 +107,7 @@ public final class GenMapRedUtils {
LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
}
- private static boolean needsTagging(ReduceWork rWork) {
+ public static boolean needsTagging(ReduceWork rWork) {
return rWork != null && (rWork.getReducer().getClass() == JoinOperator.class ||
rWork.getReducer().getClass() == DemuxOperator.class);
}
@@ -444,18 +461,38 @@ public final class GenMapRedUtils {
public static void setTaskPlan(String alias_id,
Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
- MapWork plan = ((MapredWork) task.getWork()).getMapWork();
- ParseContext parseCtx = opProcCtx.getParseCtx();
- Set<ReadEntity> inputs = opProcCtx.getInputs();
+ setMapWork(((MapredWork) task.getWork()).getMapWork(), opProcCtx.getParseCtx(),
+ opProcCtx.getInputs(), pList, topOp, alias_id, opProcCtx.getConf(), local);
+ opProcCtx.addSeenOp(task, topOp);
+ }
+ /**
+ * initialize MapWork
+ *
+ * @param alias_id
+ * current alias
+ * @param topOp
+ * the top operator of the stack
+ * @param plan
+ * map work to initialize
+ * @param local
+ * whether you need to add to map-reduce or local work
+ * @param pList
+ * pruned partition list. If it is null it will be computed on-the-fly.
+ * @param inputs
+ * read entities for the map work
+ * @param conf
+ * current instance of hive conf
+ */
+ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set<ReadEntity> inputs,
+ PrunedPartitionList partsList, Operator<? extends OperatorDesc> topOp, String alias_id,
+ HiveConf conf, boolean local) throws SemanticException {
ArrayList<Path> partDir = new ArrayList<Path>();
ArrayList<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
Path tblDir = null;
TableDesc tblDesc = null;
- PrunedPartitionList partsList = pList;
-
plan.setNameToSplitSample(parseCtx.getNameToSplitSample());
if (partsList == null) {
@@ -701,7 +738,6 @@ public final class GenMapRedUtils {
}
plan.setMapLocalWork(localPlan);
}
- opProcCtx.addSeenOp(task, topOp);
}
/**
@@ -751,6 +787,21 @@ public final class GenMapRedUtils {
}
/**
+ * Set key and value descriptor
+ * @param work RedueWork
+ * @param rs ReduceSinkOperator
+ */
+ public static void setKeyAndValueDesc(ReduceWork work, ReduceSinkOperator rs) {
+ work.setKeyDesc(rs.getConf().getKeySerializeInfo());
+ int tag = Math.max(0, rs.getConf().getTag());
+ List<TableDesc> tagToSchema = work.getTagToValueDesc();
+ while (tag + 1 > tagToSchema.size()) {
+ tagToSchema.add(null);
+ }
+ tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
+ }
+
+ /**
* set key and value descriptor.
*
* @param plan
@@ -766,13 +817,7 @@ public final class GenMapRedUtils {
if (topOp instanceof ReduceSinkOperator) {
ReduceSinkOperator rs = (ReduceSinkOperator) topOp;
- plan.setKeyDesc(rs.getConf().getKeySerializeInfo());
- int tag = Math.max(0, rs.getConf().getTag());
- List<TableDesc> tagToSchema = plan.getTagToValueDesc();
- while (tag + 1 > tagToSchema.size()) {
- tagToSchema.add(null);
- }
- tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
+ setKeyAndValueDesc(plan, rs);
} else {
List<Operator<? extends OperatorDesc>> children = topOp.getChildOperators();
if (children != null) {
@@ -1096,6 +1141,571 @@ public final class GenMapRedUtils {
}
}
+ /**
+ * @param fsInput The FileSink operator.
+ * @param ctx The MR processing context.
+ * @param finalName the final destination path the merge job should output.
+ * @param dependencyTask
+ * @param mvTasks
+ * @param conf
+ * @param currTask
+ * @throws SemanticException
+
+ * create a Map-only merge job using CombineHiveInputFormat for all partitions with
+ * following operators:
+ * MR job J0:
+ * ...
+ * |
+ * v
+ * FileSinkOperator_1 (fsInput)
+ * |
+ * v
+ * Merge job J1:
+ * |
+ * v
+ * TableScan (using CombineHiveInputFormat) (tsMerge)
+ * |
+ * v
+ * FileSinkOperator (fsMerge)
+ *
+ * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
+ * do
+ * not contain the dynamic partitions (their parent). So after the dynamic partitions are
+ * created (after the first job finished before the moveTask or ConditionalTask start),
+ * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
+ * partition
+ * directories.
+ *
+ */
+ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput,
+ Path finalName, DependencyCollectionTask dependencyTask,
+ List<Task<MoveWork>> mvTasks, HiveConf conf,
+ Task<? extends Serializable> currTask) throws SemanticException {
+
+ //
+ // 1. create the operator tree
+ //
+ FileSinkDesc fsInputDesc = fsInput.getConf();
+
+ // Create a TableScan operator
+ RowSchema inputRS = fsInput.getSchema();
+ Operator<? extends OperatorDesc> tsMerge =
+ GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
+
+ // Create a FileSink operator
+ TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
+ FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName.toUri().toString(), ts,
+ conf.getBoolVar(ConfVars.COMPRESSRESULT));
+ FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
+ fsOutputDesc, inputRS, tsMerge);
+
+ // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
+ // needs to include the partition column, and the fsOutput should have
+ // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned.
+ DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx();
+ if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
+ // adding DP ColumnInfo to the RowSchema signature
+ ArrayList<ColumnInfo> signature = inputRS.getSignature();
+ String tblAlias = fsInputDesc.getTableInfo().getTableName();
+ LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
+ StringBuilder partCols = new StringBuilder();
+ for (String dpCol : dpCtx.getDPColNames()) {
+ ColumnInfo colInfo = new ColumnInfo(dpCol,
+ TypeInfoFactory.stringTypeInfo, // all partition column type should be string
+ tblAlias, true); // partition column is virtual column
+ signature.add(colInfo);
+ colMap.put(dpCol, dpCol); // input and output have the same column name
+ partCols.append(dpCol).append('/');
+ }
+ partCols.setLength(partCols.length() - 1); // remove the last '/'
+ inputRS.setSignature(signature);
+
+ // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
+ DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
+ dpCtx2.setInputToDPCols(colMap);
+ fsOutputDesc.setDynPartCtx(dpCtx2);
+
+ // update the FileSinkOperator to include partition columns
+ fsInputDesc.getTableInfo().getProperties().setProperty(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
+ partCols.toString()); // list of dynamic partition column names
+ } else {
+ // non-partitioned table
+ fsInputDesc.getTableInfo().getProperties().remove(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+ }
+
+ //
+ // 2. Constructing a conditional task consisting of a move task and a map reduce task
+ //
+ MoveWork dummyMv = new MoveWork(null, null, null,
+ new LoadFileDesc(new Path(fsInputDesc.getFinalDirName()), finalName, true, null, null), false);
+ MapWork cplan;
+ Serializable work;
+
+ if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
+ fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+
+ // Check if InputFormatClass is valid
+ String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+ try {
+ Class c = (Class<? extends InputFormat>) Class.forName(inputFormatClass);
+
+ LOG.info("RCFile format- Using block level merge");
+ cplan = GenMapRedUtils.createRCFileMergeTask(fsInputDesc, finalName,
+ dpCtx != null && dpCtx.getNumDPCols() > 0);
+ work = cplan;
+ } catch (ClassNotFoundException e) {
+ String msg = "Illegal input format class: " + inputFormatClass;
+ throw new SemanticException(msg);
+ }
+
+ } else {
+ cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
+ work = new MapredWork();
+ ((MapredWork)work).setMapWork(cplan);
+ }
+ // use CombineHiveInputFormat for map-only merging
+ cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
+ // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
+ // know if merge MR2 will be triggered at execution time
+ ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
+ fsInputDesc.getFinalDirName());
+
+ // keep the dynamic partition context in conditional task resolver context
+ ConditionalResolverMergeFilesCtx mrCtx =
+ (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
+ mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
+ mrCtx.setLbCtx(fsInputDesc.getLbCtx());
+
+ //
+ // 3. add the moveTask as the children of the conditional task
+ //
+ linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask);
+ }
+
+ /**
+ * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
+ * possible subtrees branching from the ConditionalTask.
+ *
+ * @param newOutput
+ * @param cndTsk
+ * @param mvTasks
+ * @param hconf
+ * @param dependencyTask
+ */
+ public static void linkMoveTask(FileSinkOperator newOutput,
+ ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf,
+ DependencyCollectionTask dependencyTask) {
+
+ Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput);
+
+ for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
+ linkMoveTask(mvTask, tsk, hconf, dependencyTask);
+ }
+ }
+
+ /**
+ * Follows the task tree down from task and makes all leaves parents of mvTask
+ *
+ * @param mvTask
+ * @param task
+ * @param hconf
+ * @param dependencyTask
+ */
+ public static void linkMoveTask(Task<MoveWork> mvTask,
+ Task<? extends Serializable> task, HiveConf hconf,
+ DependencyCollectionTask dependencyTask) {
+
+ if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) {
+ // If it's a leaf, add the move task as a child
+ addDependentMoveTasks(mvTask, hconf, task, dependencyTask);
+ } else {
+ // Otherwise, for each child run this method recursively
+ for (Task<? extends Serializable> childTask : task.getDependentTasks()) {
+ linkMoveTask(mvTask, childTask, hconf, dependencyTask);
+ }
+ }
+ }
+
+ /**
+ * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
+ * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
+ * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
+ * well.
+ *
+ * @param mvTask
+ * @param hconf
+ * @param parentTask
+ * @param dependencyTask
+ */
+ public static void addDependentMoveTasks(Task<MoveWork> mvTask, HiveConf hconf,
+ Task<? extends Serializable> parentTask, DependencyCollectionTask dependencyTask) {
+
+ if (mvTask != null) {
+ if (dependencyTask != null) {
+ parentTask.addDependentTask(dependencyTask);
+ if (mvTask.getWork().getLoadTableWork() != null) {
+ // Moving tables/partitions depend on the dependencyTask
+ dependencyTask.addDependentTask(mvTask);
+ } else {
+ // Moving files depends on the parentTask (we still want the dependencyTask to depend
+ // on the parentTask)
+ parentTask.addDependentTask(mvTask);
+ }
+ } else {
+ parentTask.addDependentTask(mvTask);
+ }
+ }
+ }
+
+
+ /**
+ * Add the StatsTask as a dependent task of the MoveTask
+ * because StatsTask will change the Table/Partition metadata. For atomicity, we
+ * should not change it before the data is actually there done by MoveTask.
+ *
+ * @param nd
+ * the FileSinkOperator whose results are taken care of by the MoveTask.
+ * @param mvTask
+ * The MoveTask that moves the FileSinkOperator's results.
+ * @param currTask
+ * The MapRedTask that the FileSinkOperator belongs to.
+ * @param hconf
+ * HiveConf
+ */
+ public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
+ Task<? extends Serializable> currTask, HiveConf hconf) {
+
+ MoveWork mvWork = mvTask.getWork();
+ StatsWork statsWork = null;
+ if (mvWork.getLoadTableWork() != null) {
+ statsWork = new StatsWork(mvWork.getLoadTableWork());
+ } else if (mvWork.getLoadFileWork() != null) {
+ statsWork = new StatsWork(mvWork.getLoadFileWork());
+ }
+ assert statsWork != null : "Error when genereting StatsTask";
+
+ statsWork.setSourceTask(currTask);
+ statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+
+ if (currTask.getWork() instanceof MapredWork) {
+ MapredWork mrWork = (MapredWork) currTask.getWork();
+ mrWork.getMapWork().setGatheringStats(true);
+ if (mrWork.getReduceWork() != null) {
+ mrWork.getReduceWork().setGatheringStats(true);
+ }
+ }
+
+ // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
+ // in FileSinkDesc is used for stats publishing. They should be consistent.
+ statsWork.setAggKey(nd.getConf().getStatsAggPrefix());
+ Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
+
+ // mark the MapredWork and FileSinkOperator for gathering stats
+ nd.getConf().setGatherStats(true);
+ nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+ nd.getConf().setMaxStatsKeyPrefixLength(StatsFactory.getMaxPrefixLength(hconf));
+ // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName());
+
+ // subscribe feeds from the MoveTask so that MoveTask can forward the list
+ // of dynamic partition list to the StatsTask
+ mvTask.addDependentTask(statsTask);
+ statsTask.subscribeFeed(mvTask);
+ }
+
+ /**
+ * Returns true iff current query is an insert into for the given file sink
+ *
+ * @param parseCtx
+ * @param fsOp
+ * @return
+ */
+ public static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp) {
+ return fsOp.getConf().getTableInfo().getTableName() != null &&
+ parseCtx.getQB().getParseInfo().isInsertToTable();
+ }
+
+ /**
+ * Create a MapredWork based on input path, the top operator and the input
+ * table descriptor.
+ *
+ * @param conf
+ * @param topOp
+ * the table scan operator that is the root of the MapReduce task.
+ * @param fsDesc
+ * the file sink descriptor that serves as the input to this merge task.
+ * @param parentMR
+ * the parent MapReduce work
+ * @param parentFS
+ * the last FileSinkOperator in the parent MapReduce work
+ * @return the MapredWork
+ */
+ private static MapWork createMRWorkForMergingFiles (HiveConf conf,
+ Operator<? extends OperatorDesc> topOp, FileSinkDesc fsDesc) {
+
+ ArrayList<String> aliases = new ArrayList<String>();
+ String inputDir = fsDesc.getFinalDirName();
+ TableDesc tblDesc = fsDesc.getTableInfo();
+ aliases.add(inputDir); // dummy alias: just use the input path
+
+ // constructing the default MapredWork
+ MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+ MapWork cplan = cMrPlan.getMapWork();
+ cplan.getPathToAliases().put(inputDir, aliases);
+ cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
+ cplan.getAliasToWork().put(inputDir, topOp);
+ cplan.setMapperCannotSpanPartns(true);
+
+ return cplan;
+ }
+
+ /**
+ * Create a block level merge task for RCFiles.
+ *
+ * @param fsInputDesc
+ * @param finalName
+ * @return MergeWork if table is stored as RCFile,
+ * null otherwise
+ */
+ public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+ Path finalName, boolean hasDynamicPartitions) throws SemanticException {
+
+ String inputDir = fsInputDesc.getFinalDirName();
+ TableDesc tblDesc = fsInputDesc.getTableInfo();
+
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ ArrayList<Path> inputDirs = new ArrayList<Path>(1);
+ ArrayList<String> inputDirstr = new ArrayList<String>(1);
+ if (!hasDynamicPartitions
+ && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+ inputDirs.add(new Path(inputDir));
+ inputDirstr.add(inputDir);
+ }
+
+ MergeWork work = new MergeWork(inputDirs, finalName,
+ hasDynamicPartitions, fsInputDesc.getDynPartCtx());
+ LinkedHashMap<String, ArrayList<String>> pathToAliases =
+ new LinkedHashMap<String, ArrayList<String>>();
+ pathToAliases.put(inputDir, (ArrayList<String>) inputDirstr.clone());
+ work.setMapperCannotSpanPartns(true);
+ work.setPathToAliases(pathToAliases);
+ work.setAliasToWork(
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
+ if (hasDynamicPartitions
+ || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+ work.getPathToPartitionInfo().put(inputDir,
+ new PartitionDesc(tblDesc, null));
+ }
+ work.setListBucketingCtx(fsInputDesc.getLbCtx());
+
+ return work;
+ }
+
+ throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
+ }
+
+ /**
+ * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
+ *
+ * @param conf
+ * HiveConf
+ * @param currTask
+ * current leaf task
+ * @param mvWork
+ * MoveWork for the move task
+ * @param mergeWork
+ * MapredWork for the merge task.
+ * @param inputPath
+ * the input directory of the merge/move task
+ * @return The conditional task
+ */
+ @SuppressWarnings("unchecked")
+ public static ConditionalTask createCondTask(HiveConf conf,
+ Task<? extends Serializable> currTask, MoveWork mvWork,
+ Serializable mergeWork, String inputPath) {
+
+ // There are 3 options for this ConditionalTask:
+ // 1) Merge the partitions
+ // 2) Move the partitions (i.e. don't merge the partitions)
+ // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
+ // merge others) in this case the merge is done first followed by the move to prevent
+ // conflicts.
+ Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
+ Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
+ Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
+ Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
+
+ // NOTE! It is necessary merge task is the parent of the move task, and not
+ // the other way around, for the proper execution of the execute method of
+ // ConditionalTask
+ mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
+
+ List<Serializable> listWorks = new ArrayList<Serializable>();
+ listWorks.add(mvWork);
+ listWorks.add(mergeWork);
+
+ ConditionalWork cndWork = new ConditionalWork(listWorks);
+
+ List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
+ listTasks.add(moveOnlyMoveTask);
+ listTasks.add(mergeOnlyMergeTask);
+ listTasks.add(mergeAndMoveMergeTask);
+
+ ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf);
+ cndTsk.setListTasks(listTasks);
+
+ // create resolver
+ cndTsk.setResolver(new ConditionalResolverMergeFiles());
+ ConditionalResolverMergeFilesCtx mrCtx =
+ new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
+ cndTsk.setResolverCtx(mrCtx);
+
+ // make the conditional task as the child of the current leaf task
+ currTask.addDependentTask(cndTsk);
+
+ return cndTsk;
+ }
+
+ /**
+ * check if it is skewed table and stored as dirs.
+ *
+ * @param fsInputDesc
+ * @return
+ */
+ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
+ return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
+ .isSkewedStoredAsDir();
+ }
+
+ public static Task<MoveWork> findMoveTask(
+ List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp) {
+ // find the move task
+ for (Task<MoveWork> mvTsk : mvTasks) {
+ MoveWork mvWork = mvTsk.getWork();
+ Path srcDir = null;
+ if (mvWork.getLoadFileWork() != null) {
+ srcDir = mvWork.getLoadFileWork().getSourcePath();
+ } else if (mvWork.getLoadTableWork() != null) {
+ srcDir = mvWork.getLoadTableWork().getSourcePath();
+ }
+
+ if ((srcDir != null)
+ && (srcDir.equals(new Path(fsOp.getConf().getFinalDirName())))) {
+ return mvTsk;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns true iff the fsOp requires a merge
+ * @param mvTasks
+ * @param hconf
+ * @param fsOp
+ * @param currTask
+ * @param isInsertTable
+ * @return
+ */
+ public static boolean isMergeRequired(List<Task<MoveWork>> mvTasks, HiveConf hconf, FileSinkOperator fsOp,
+ Task<? extends Serializable> currTask, boolean isInsertTable) {
+
+ // Has the user enabled merging of files for map-only jobs or for all jobs
+ if ((mvTasks != null) && (!mvTasks.isEmpty())) {
+
+ // no need of merging if the move is to a local file system
+ MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+
+ if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
+ GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf);
+ }
+
+ if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
+ if (fsOp.getConf().isLinkedFileSink()) {
+ // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
+ // number of reducers are few, so the number of files anyway are small.
+ // However, with this optimization, we are increasing the number of files
+ // possibly by a big margin. So, merge aggresively.
+ if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
+ return true;
+ }
+ } else {
+ // There are separate configuration parameters to control whether to
+ // merge for a map-only job
+ // or for a map-reduce job
+ if (currTask.getWork() instanceof MapredWork) {
+ ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
+ boolean mergeMapOnly =
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
+ boolean mergeMapRed =
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
+ reduceWork != null;
+ if (mergeMapOnly || mergeMapRed) {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Create and add any dependent move tasks
+ *
+ * @param currTask
+ * @param chDir
+ * @param fsOp
+ * @param parseCtx
+ * @param mvTasks
+ * @param hconf
+ * @param dependencyTask
+ * @return
+ */
+ public static Path createMoveTask(Task<? extends Serializable> currTask, boolean chDir,
+ FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> mvTasks,
+ HiveConf hconf, DependencyCollectionTask dependencyTask) {
+
+ Path dest = null;
+
+ if (chDir) {
+ dest = new Path(fsOp.getConf().getFinalDirName());
+
+ // generate the temporary file
+ // it must be on the same file system as the current destination
+ Context baseCtx = parseCtx.getContext();
+ String tmpDir = baseCtx.getExternalTmpFileURI(dest.toUri());
+
+ FileSinkDesc fileSinkDesc = fsOp.getConf();
+ // Change all the linked file sink descriptors
+ if (fileSinkDesc.isLinkedFileSink()) {
+ for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
+ String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName());
+ fsConf.setParentDir(tmpDir);
+ fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName);
+ }
+ } else {
+ fileSinkDesc.setDirName(tmpDir);
+ }
+ }
+
+ Task<MoveWork> mvTask = null;
+
+ if (!chDir) {
+ mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+ }
+
+ // Set the move task to be dependent on the current task
+ if (mvTask != null) {
+ GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask);
+ }
+
+ return dest;
+ }
+
private GenMapRedUtils() {
// prevent instantiation
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Mon Jan 6 21:32:38 2014
@@ -286,7 +286,7 @@ public class GroupByOptimizer implements
currOp = currOp.getParentOperators().get(0);
while (true) {
- if (currOp.getParentOperators() == null) {
+ if ((currOp.getParentOperators() == null) || (currOp.getParentOperators().isEmpty())) {
break;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Jan 6 21:32:38 2014
@@ -236,13 +236,14 @@ public class MapJoinProcessor implements
* @return the alias to the big table
* @throws SemanticException
*/
- public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
+ public static String genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork,
+ JoinOperator op, int mapJoinPos)
throws SemanticException {
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
newWork.getMapWork().getOpParseCtxMap();
QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
// generate the map join operator; already checked the map join
- MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
+ MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(conf, opParseCtxMap, op,
newJoinTree, mapJoinPos, true, false);
return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
}
@@ -315,7 +316,7 @@ public class MapJoinProcessor implements
* are cached in memory
* @param noCheckOuterJoin
*/
- public static MapJoinOperator convertMapJoin(
+ public static MapJoinOperator convertMapJoin(HiveConf conf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
boolean validateMapJoinTree)
@@ -372,21 +373,90 @@ public class MapJoinProcessor implements
pos++;
}
- // get the join keys from old parent ReduceSink operators
+ // create the map-join operator
+ MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
+ op, joinTree, mapJoinPos, noCheckOuterJoin);
+
+
+ // remove old parents
for (pos = 0; pos < newParentOps.size(); pos++) {
- ReduceSinkOperator oldPar = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
- ReduceSinkDesc rsconf = oldPar.getConf();
+ newParentOps.get(pos).removeChild(oldReduceSinkParentOps.get(pos));
+ newParentOps.get(pos).getChildOperators().add(mapJoinOp);
+ }
+
+
+ mapJoinOp.getParentOperators().removeAll(oldReduceSinkParentOps);
+ mapJoinOp.setParentOperators(newParentOps);
+
+ // make sure only map-joins can be performed.
+ if (validateMapJoinTree) {
+ validateMapJoinTypes(mapJoinOp);
+ }
+
+ // change the children of the original join operator to point to the map
+ // join operator
+
+ return mapJoinOp;
+ }
+
+ public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
+ LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+ JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
+ throws SemanticException {
+
+ JoinDesc desc = op.getConf();
+ JoinCondDesc[] condns = desc.getConds();
+ Byte[] tagOrder = desc.getTagOrder();
+
+ // outer join cannot be performed on a table which is being cached
+ if (!noCheckOuterJoin) {
+ if (checkMapJoin(mapJoinPos, condns) < 0) {
+ throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+ }
+ }
+
+ Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+ // Walk over all the sources (which are guaranteed to be reduce sink
+ // operators).
+ // The join outputs a concatenation of all the inputs.
+ QBJoinTree leftSrc = joinTree.getJoinSrc();
+ List<Operator<? extends OperatorDesc>> oldReduceSinkParentOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ if (leftSrc != null) {
+ // assert mapJoinPos == 0;
+ Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+ assert parentOp.getParentOperators().size() == 1;
+ oldReduceSinkParentOps.add(parentOp);
+ }
+
+
+ byte pos = 0;
+ for (String src : joinTree.getBaseSrc()) {
+ if (src != null) {
+ Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+ assert parentOp.getParentOperators().size() == 1;
+ oldReduceSinkParentOps.add(parentOp);
+ }
+ pos++;
+ }
+
+ // get the join keys from old parent ReduceSink operators
+ for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+ ReduceSinkOperator parent = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
+ ReduceSinkDesc rsconf = parent.getConf();
List<ExprNodeDesc> keys = rsconf.getKeyCols();
keyExprMap.put(pos, keys);
}
- // removing RS, only ExprNodeDesc is changed (key/value/filter exprs and colExprMap)
- // others (output column-name, RR, schema) remain intact
- Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
- List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+ List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
+ StringBuilder keyOrder = new StringBuilder();
+ for (int i = 0; i < keyCols.size(); i++) {
+ keyOrder.append("+");
+ }
+ Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
-
Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
@@ -410,45 +480,12 @@ public class MapJoinProcessor implements
}
}
- Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
- Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
- for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
- byte srcTag = entry.getKey();
- List<ExprNodeDesc> filter = entry.getValue();
-
- Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
- newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
- }
- desc.setFilters(filters = newFilters);
-
- // remove old parents
- for (pos = 0; pos < newParentOps.size(); pos++) {
- newParentOps.get(pos).removeChild(oldReduceSinkParentOps.get(pos));
- }
-
- JoinCondDesc[] joinCondns = op.getConf().getConds();
-
- Operator[] newPar = new Operator[newParentOps.size()];
- pos = 0;
- for (Operator<? extends OperatorDesc> o : newParentOps) {
- newPar[pos++] = o;
- }
-
- List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
- StringBuilder keyOrder = new StringBuilder();
- for (int i = 0; i < keyCols.size(); i++) {
- keyOrder.append("+");
- }
-
- TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
-
+ // construct valueTableDescs and valueFilteredTableDescs
List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
List<TableDesc> valueFiltedTableDescs = new ArrayList<TableDesc>();
-
int[][] filterMap = desc.getFilterMap();
- for (pos = 0; pos < newParentOps.size(); pos++) {
- List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+ for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+ List<ExprNodeDesc> valueCols = newValueExprs.get(Byte.valueOf((byte) pos));
int length = valueCols.size();
List<ExprNodeDesc> valueFilteredCols = new ArrayList<ExprNodeDesc>(length);
// deep copy expr node desc
@@ -475,6 +512,19 @@ public class MapJoinProcessor implements
valueTableDescs.add(valueTableDesc);
valueFiltedTableDescs.add(valueFilteredTableDesc);
}
+
+ Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+ Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+ for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+ byte srcTag = entry.getKey();
+ List<ExprNodeDesc> filter = entry.getValue();
+
+ Operator<?> terminal = op.getParentOperators().get(srcTag);
+ newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+ }
+ desc.setFilters(filters = newFilters);
+
+ // create dumpfile prefix needed to create descriptor
String dumpFilePrefix = "";
if( joinTree.getMapAliases() != null ) {
for(String mapAlias : joinTree.getMapAliases()) {
@@ -484,15 +534,24 @@ public class MapJoinProcessor implements
} else {
dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
}
+
+ List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+ TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
+ PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+ JoinCondDesc[] joinCondns = op.getConf().getConds();
MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns,
filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
+ mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
mapJoinDescriptor.setTagOrder(tagOrder);
mapJoinDescriptor.setNullSafes(desc.getNullSafes());
mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+ // reduce sink row resolver used to generate map join op
+ RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
+
MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
- mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), newPar);
+ mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), op.getParentOperators());
OpParseContext ctx = new OpParseContext(outputRS);
opParseCtxMap.put(mapJoinOp, ctx);
@@ -500,24 +559,17 @@ public class MapJoinProcessor implements
mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
mapJoinOp.setColumnExprMap(colExprMap);
- // change the children of the original join operator to point to the map
- // join operator
List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
for (Operator<? extends OperatorDesc> childOp : childOps) {
childOp.replaceParent(op, mapJoinOp);
}
mapJoinOp.setChildOperators(childOps);
- mapJoinOp.setParentOperators(newParentOps);
op.setChildOperators(null);
op.setParentOperators(null);
- // make sure only map-joins can be performed.
- if (validateMapJoinTree) {
- validateMapJoinTypes(mapJoinOp);
- }
-
return mapJoinOp;
+
}
/**
@@ -533,14 +585,14 @@ public class MapJoinProcessor implements
* are cached in memory
* @param noCheckOuterJoin
*/
- public static MapJoinOperator convertSMBJoinToMapJoin(
+ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf,
Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin)
throws SemanticException {
// Create a new map join operator
SMBJoinDesc smbJoinDesc = smbJoinOp.getConf();
List<ExprNodeDesc> keyCols = smbJoinDesc.getKeys().get(Byte.valueOf((byte) 0));
- TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
+ TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, PlanUtils
.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
MapJoinDesc mapJoinDesc = new MapJoinDesc(smbJoinDesc.getKeys(),
keyTableDesc, smbJoinDesc.getExprs(),
@@ -549,6 +601,8 @@ public class MapJoinProcessor implements
bigTablePos, smbJoinDesc.getConds(),
smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix());
+ mapJoinDesc.setStatistics(smbJoinDesc.getStatistics());
+
RowResolver joinRS = opParseCtxMap.get(smbJoinOp).getRowResolver();
// The mapjoin has the same schema as the join operator
MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
@@ -588,8 +642,8 @@ public class MapJoinProcessor implements
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
.getOpParseCtx();
- MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos,
- noCheckOuterJoin, true);
+ MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op,
+ joinTree, mapJoinPos, noCheckOuterJoin, true);
// create a dummy select to select all columns
genSelectPlan(pctx, mapJoinOp);
return mapJoinOp;
@@ -609,7 +663,7 @@ public class MapJoinProcessor implements
* If see a right outer join, set lastSeenRightOuterJoin to true, clear the
* bigTableCandidates, and add right side to the bigTableCandidates, it means
* the right side of a right outer join always win. If see a full outer join,
- * return null immediately (no one can be the big table, can not do a
+ * return empty set immediately (no one can be the big table, can not do a
* mapjoin).
*
*
@@ -635,7 +689,8 @@ public class MapJoinProcessor implements
// changed in future, these 2 are not missing.
seenOuterJoin = true;
lastSeenRightOuterJoin = false;
- return null;
+ // empty set - cannot convert
+ return new HashSet<Integer>();
} else if (joinType == JoinDesc.LEFT_OUTER_JOIN
|| joinType == JoinDesc.LEFT_SEMI_JOIN) {
seenOuterJoin = true;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Mon Jan 6 21:32:38 2014
@@ -398,7 +398,8 @@ public class SkewJoinOptimizer implement
return parseContext.getTopToTable().get(tsOp);
}
}
- if ((op.getParentOperators() == null) || (op.getParentOperators().size() > 1)) {
+ if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) ||
+ (op.getParentOperators().size() > 1)) {
return null;
}
op = op.getParentOperators().get(0);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Mon Jan 6 21:32:38 2014
@@ -168,7 +168,7 @@ public class CorrelationOptimizer implem
int numAliases = order.length;
Set<Integer> bigTableCandidates =
MapJoinProcessor.getBigTableCandidates(joinDesc.getConds());
- if (bigTableCandidates == null) {
+ if (bigTableCandidates.isEmpty()) {
continue;
}
@@ -346,7 +346,7 @@ public class CorrelationOptimizer implem
"involved in this operator");
return correlatedReduceSinkOperators;
}
- if (current.getParentOperators() == null) {
+ if ((current.getParentOperators() == null) || (current.getParentOperators().isEmpty())) {
return correlatedReduceSinkOperators;
}
if (current instanceof PTFOperator) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Mon Jan 6 21:32:38 2014
@@ -189,7 +189,8 @@ public class CommonJoinTaskDispatcher ex
// optimize this newWork given the big table position
String bigTableAlias =
- MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, bigTablePosition);
+ MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(),
+ newWork, newJoinOp, bigTablePosition);
return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
}
@@ -434,7 +435,7 @@ public class CommonJoinTaskDispatcher ex
.getConds());
// no table could be the big table; there is no need to convert
- if (bigTableCandidates == null) {
+ if (bigTableCandidates.isEmpty()) {
return null;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Mon Jan 6 21:32:38 2014
@@ -205,7 +205,7 @@ public class SortMergeJoinTaskDispatcher
Operator<? extends OperatorDesc> currOp = originalSMBJoinOp;
while (true) {
- if (currOp.getChildOperators() == null) {
+ if ((currOp.getChildOperators() == null) || (currOp.getChildOperators().isEmpty())) {
if (currOp instanceof FileSinkOperator) {
FileSinkOperator fsOp = (FileSinkOperator)currOp;
// The query has enforced that a sort-merge join should be performed.
@@ -433,7 +433,8 @@ public class SortMergeJoinTaskDispatcher
opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
// generate the map join operator
- return MapJoinProcessor.convertSMBJoinToMapJoin(opParseContextMap, newSMBJoinOp,
+ return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(),
+ opParseContextMap, newSMBJoinOp,
joinTree, mapJoinPos, true);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java Mon Jan 6 21:32:38 2014
@@ -22,10 +22,10 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,21 +33,14 @@ import org.apache.hadoop.fs.ContentSumma
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
@@ -61,9 +54,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
@@ -73,302 +63,25 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1;
import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
-import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.ShimLoader;
-public class MapReduceCompiler {
+public class MapReduceCompiler extends TaskCompiler {
protected final Log LOG = LogFactory.getLog(MapReduceCompiler.class);
- private Hive db;
- protected LogHelper console;
- private HiveConf conf;
-
public MapReduceCompiler() {
}
- public void init(HiveConf conf, LogHelper console, Hive db) {
- this.conf = conf;
- this.db = db;
- this.console = console;
- }
-
- @SuppressWarnings({"nls", "unchecked"})
- public void compile(final ParseContext pCtx, final List<Task<? extends Serializable>> rootTasks,
- final HashSet<ReadEntity> inputs, final HashSet<WriteEntity> outputs) throws SemanticException {
-
- Context ctx = pCtx.getContext();
- GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx();
- QB qb = pCtx.getQB();
- List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
-
- List<LoadTableDesc> loadTableWork = pCtx.getLoadTableWork();
- List<LoadFileDesc> loadFileWork = pCtx.getLoadFileWork();
-
- boolean isCStats = qb.isAnalyzeRewrite();
-
- if (pCtx.getFetchTask() != null) {
- return;
- }
-
- /*
- * In case of a select, use a fetch task instead of a move task.
- * If the select is from analyze table column rewrite, don't create a fetch task. Instead create
- * a column stats task later.
- */
- if (pCtx.getQB().getIsQuery() && !isCStats) {
- if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
- throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
- }
-
- LoadFileDesc loadFileDesc = loadFileWork.get(0);
-
- String cols = loadFileDesc.getColumns();
- String colTypes = loadFileDesc.getColumnTypes();
-
- TableDesc resultTab = pCtx.getFetchTabledesc();
- if (resultTab == null) {
- String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
- }
-
- FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(),
- resultTab, qb.getParseInfo().getOuterQueryLimit());
- fetch.setSource(pCtx.getFetchSource());
- fetch.setSink(pCtx.getFetchSink());
-
- pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch, conf));
-
- // For the FetchTask, the limit optimization requires we fetch all the rows
- // in memory and count how many rows we get. It's not practical if the
- // limit factor is too big
- int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
- if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) {
- LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit
- + ". Doesn't qualify limit optimiztion.");
- globalLimitCtx.disableOpt();
- }
- } else if (!isCStats) {
- for (LoadTableDesc ltd : loadTableWork) {
- Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
- mvTask.add(tsk);
- // Check to see if we are stale'ing any indexes and auto-update them if we want
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
- IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf);
- try {
- List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
- .generateUpdateTasks();
- for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
- tsk.addDependentTask(updateTask);
- }
- } catch (HiveException e) {
- console
- .printInfo("WARNING: could not auto-update stale indexes, which are not in sync");
- }
- }
- }
-
- boolean oneLoadFile = true;
- for (LoadFileDesc lfd : loadFileWork) {
- if (qb.isCTAS()) {
- assert (oneLoadFile); // should not have more than 1 load file for
- // CTAS
- // make the movetask's destination directory the table's destination.
- Path location;
- String loc = qb.getTableDesc().getLocation();
- if (loc == null) {
- // get the table's default location
- Table dumpTable;
- Path targetPath;
- try {
- dumpTable = db.newTable(qb.getTableDesc().getTableName());
- if (!db.databaseExists(dumpTable.getDbName())) {
- throw new SemanticException("ERROR: The database " + dumpTable.getDbName()
- + " does not exist.");
- }
- Warehouse wh = new Warehouse(conf);
- targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable
- .getTableName());
- } catch (HiveException e) {
- throw new SemanticException(e);
- } catch (MetaException e) {
- throw new SemanticException(e);
- }
-
- location = targetPath;
- } else {
- location = new Path(loc);
- }
- lfd.setTargetDir(location);
-
- oneLoadFile = false;
- }
- mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf));
- }
- }
-
- // generate map reduce plans
- ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
- GenMRProcContext procCtx = new GenMRProcContext(
- conf,
- new HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
- tempParseContext, mvTask, rootTasks,
- new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
- inputs, outputs);
-
- // create a walker which walks the tree in a DFS manner while maintaining
- // the operator stack.
- // The dispatcher generates the plan from the operator tree
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp(new String("R1"),
- TableScanOperator.getOperatorName() + "%"),
- new GenMRTableScan1());
- opRules.put(new RuleRegExp(new String("R2"),
- TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
- new GenMRRedSink1());
- opRules.put(new RuleRegExp(new String("R3"),
- ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
- new GenMRRedSink2());
- opRules.put(new RuleRegExp(new String("R4"),
- FileSinkOperator.getOperatorName() + "%"),
- new GenMRFileSink1());
- opRules.put(new RuleRegExp(new String("R5"),
- UnionOperator.getOperatorName() + "%"),
- new GenMRUnion1());
- opRules.put(new RuleRegExp(new String("R6"),
- UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
- new GenMRRedSink3());
- opRules.put(new RuleRegExp(new String("R7"),
- MapJoinOperator.getOperatorName() + "%"),
- MapJoinFactory.getTableScanMapJoin());
-
- // The dispatcher fires the processor corresponding to the closest matching
- // rule and passes the context along
- Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
- procCtx);
-
- GraphWalker ogw = new GenMapRedWalker(disp);
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pCtx.getTopOps().values());
- ogw.startWalking(topNodes, null);
-
- /*
- * If the query was the result of analyze table column compute statistics rewrite, create
- * a column stats task instead of a fetch task to persist stats to the metastore.
- */
- if (isCStats) {
- genColumnStatsTask(qb, loadTableWork, loadFileWork, rootTasks);
- }
-
- // reduce sink does not have any kids - since the plan by now has been
- // broken up into multiple
- // tasks, iterate over all tasks.
- // For each task, go over all operators recursively
- for (Task<? extends Serializable> rootTask : rootTasks) {
- breakTaskTree(rootTask);
- }
-
- // For each task, set the key descriptor for the reducer
- for (Task<? extends Serializable> rootTask : rootTasks) {
- GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
- }
-
- // If a task contains an operator which instructs bucketizedhiveinputformat
- // to be used, please do so
- for (Task<? extends Serializable> rootTask : rootTasks) {
- setInputFormat(rootTask);
- }
-
- PhysicalContext physicalContext = new PhysicalContext(conf,
- getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
- PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
- physicalContext, conf);
- physicalOptimizer.optimize();
-
- decideExecMode(rootTasks, ctx, globalLimitCtx);
-
- if (qb.isCTAS()) {
- // generate a DDL task and make it a dependent task of the leaf
- CreateTableDesc crtTblDesc = qb.getTableDesc();
-
- crtTblDesc.validate();
-
- // Clear the output for CTAS since we don't need the output from the
- // mapredWork, the
- // DDLWork at the tail of the chain will have the output
- outputs.clear();
-
- Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork(
- inputs, outputs, crtTblDesc), conf);
-
- // find all leaf tasks and make the DDLTask as a dependent task of all of
- // them
- HashSet<Task<? extends Serializable>> leaves = new HashSet<Task<? extends Serializable>>();
- getLeafTasks(rootTasks, leaves);
- assert (leaves.size() > 0);
- for (Task<? extends Serializable> task : leaves) {
- if (task instanceof StatsTask) {
- // StatsTask require table to already exist
- for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) {
- parentOfStatsTask.addDependentTask(crtTblTask);
- }
- for (Task<? extends Serializable> parentOfCrtTblTask : crtTblTask.getParentTasks()) {
- parentOfCrtTblTask.removeDependentTask(task);
- }
- crtTblTask.addDependentTask(task);
- } else {
- task.addDependentTask(crtTblTask);
- }
- }
- }
-
- if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) {
- LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit());
- pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit());
- }
-
- if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) {
- LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit());
- globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit());
- List<ExecDriver> mrTasks = Utilities.getMRTasks(rootTasks);
- for (ExecDriver tsk : mrTasks) {
- tsk.setRetryCmdWhenFail(true);
- }
- }
- }
-
- private void setInputFormat(MapWork work, Operator<? extends OperatorDesc> op) {
- if (op.isUseBucketizedHiveInputFormat()) {
- work.setUseBucketizedHiveInputFormat(true);
- return;
- }
-
- if (op.getChildOperators() != null) {
- for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
- setInputFormat(work, childOp);
- }
- }
- }
-
// loop over all the tasks recursively
- private void setInputFormat(Task<? extends Serializable> task) {
+ @Override
+ protected void setInputFormat(Task<? extends Serializable> task) {
if (task instanceof ExecDriver) {
MapWork work = ((MapredWork) task.getWork()).getMapWork();
HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
@@ -392,6 +105,20 @@ public class MapReduceCompiler {
}
}
+ private void setInputFormat(MapWork work, Operator<? extends OperatorDesc> op) {
+ if (op.isUseBucketizedHiveInputFormat()) {
+ work.setUseBucketizedHiveInputFormat(true);
+ return;
+ }
+
+ if (op.getChildOperators() != null) {
+ for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+ setInputFormat(work, childOp);
+ }
+ }
+ }
+
+ @Override
public ParseContext getParseContext(ParseContext pCtx, List<Task<? extends Serializable>> rootTasks) {
return new ParseContext(conf, pCtx.getQB(), pCtx.getParseTree(),
pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(),
@@ -452,67 +179,6 @@ public class MapReduceCompiler {
}
/**
- * A helper function to generate a column stats task on top of map-red task. The column stats
- * task fetches from the output of the map-red task, constructs the column stats object and
- * persists it to the metastore.
- *
- * This method generates a plan with a column stats task on top of map-red task and sets up the
- * appropriate metadata to be used during execution.
- *
- * @param qb
- */
- @SuppressWarnings("unchecked")
- private void genColumnStatsTask(QB qb, List<LoadTableDesc> loadTableWork,
- List<LoadFileDesc> loadFileWork, List<Task<? extends Serializable>> rootTasks) {
- QBParseInfo qbParseInfo = qb.getParseInfo();
- ColumnStatsTask cStatsTask = null;
- ColumnStatsWork cStatsWork = null;
- FetchWork fetch = null;
- String tableName = qbParseInfo.getTableName();
- String partName = qbParseInfo.getPartName();
- List<String> colName = qbParseInfo.getColName();
- List<String> colType = qbParseInfo.getColType();
- boolean isTblLevel = qbParseInfo.isTblLvl();
-
- String cols = loadFileWork.get(0).getColumns();
- String colTypes = loadFileWork.get(0).getColumnTypes();
-
- String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
-
- fetch = new FetchWork(loadFileWork.get(0).getSourcePath(),
- resultTab, qb.getParseInfo().getOuterQueryLimit());
-
- ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName,
- colName, colType, isTblLevel);
- cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
- cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
- rootTasks.add(cStatsTask);
- }
-
- /**
- * Find all leaf tasks of the list of root tasks.
- */
- private void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
- HashSet<Task<? extends Serializable>> leaves) {
-
- for (Task<? extends Serializable> root : rootTasks) {
- getLeafTasks(root, leaves);
- }
- }
-
- private void getLeafTasks(Task<? extends Serializable> task,
- HashSet<Task<? extends Serializable>> leaves) {
- if (task.getDependentTasks() == null) {
- if (!leaves.contains(task)) {
- leaves.add(task);
- }
- } else {
- getLeafTasks(task.getDependentTasks(), leaves);
- }
- }
-
- /**
* Make a best guess at trying to find the number of reducers
*/
private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) {
@@ -527,7 +193,8 @@ public class MapReduceCompiler {
return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
}
- private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
+ @Override
+ protected void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
GlobalLimitCtx globalLimitCtx)
throws SemanticException {
@@ -603,4 +270,74 @@ public class MapReduceCompiler {
console.printInfo("Automatically selecting local only mode for query");
}
}
+
+ @Override
+ protected void optimizeTaskPlan(List<Task<? extends Serializable>> rootTasks,
+ ParseContext pCtx, Context ctx) throws SemanticException {
+ // reduce sink does not have any kids - since the plan by now has been
+ // broken up into multiple
+ // tasks, iterate over all tasks.
+ // For each task, go over all operators recursively
+ for (Task<? extends Serializable> rootTask : rootTasks) {
+ breakTaskTree(rootTask);
+ }
+
+
+ PhysicalContext physicalContext = new PhysicalContext(conf,
+ getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
+ PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
+ physicalContext, conf);
+ physicalOptimizer.optimize();
+
+ }
+
+ @Override
+ protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
+ List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+ // generate map reduce plans
+ ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
+ GenMRProcContext procCtx = new GenMRProcContext(
+ conf,
+ new HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
+ tempParseContext, mvTask, rootTasks,
+ new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
+ inputs, outputs);
+
+ // create a walker which walks the tree in a DFS manner while maintaining
+ // the operator stack.
+ // The dispatcher generates the plan from the operator tree
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp(new String("R1"),
+ TableScanOperator.getOperatorName() + "%"),
+ new GenMRTableScan1());
+ opRules.put(new RuleRegExp(new String("R2"),
+ TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+ new GenMRRedSink1());
+ opRules.put(new RuleRegExp(new String("R3"),
+ ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+ new GenMRRedSink2());
+ opRules.put(new RuleRegExp(new String("R4"),
+ FileSinkOperator.getOperatorName() + "%"),
+ new GenMRFileSink1());
+ opRules.put(new RuleRegExp(new String("R5"),
+ UnionOperator.getOperatorName() + "%"),
+ new GenMRUnion1());
+ opRules.put(new RuleRegExp(new String("R6"),
+ UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+ new GenMRRedSink3());
+ opRules.put(new RuleRegExp(new String("R7"),
+ MapJoinOperator.getOperatorName() + "%"),
+ MapJoinFactory.getTableScanMapJoin());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
+ procCtx);
+
+ GraphWalker ogw = new GenMapRedWalker(disp);
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Jan 6 21:32:38 2014
@@ -8958,7 +8958,7 @@ public class SemanticAnalyzer extends Ba
if (!ctx.getExplainLogical()) {
// At this point we have the complete operator tree
// from which we want to create the map-reduce plan
- MapReduceCompiler compiler = new MapReduceCompiler();
+ TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
compiler.init(conf, console, db);
compiler.compile(pCtx, rootTasks, inputs, outputs);
fetchTask = pCtx.getFetchTask();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java?rev=1556041&r1=1556040&r2=1556041&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java Mon Jan 6 21:32:38 2014
@@ -239,7 +239,7 @@ public class TableAccessAnalyzer {
// and filters.
while (true) {
parentOps = currOp.getParentOperators();
- if (parentOps == null) {
+ if ((parentOps == null) || (parentOps.isEmpty())) {
return (TableScanOperator) currOp;
}