You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [14/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Thu Jan 21 10:37:58 2010
@@ -18,37 +18,36 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.List;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Stack;
-import java.io.Serializable;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
@@ -56,14 +55,14 @@
import org.apache.hadoop.hive.ql.plan.extractDesc;
import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
import org.apache.hadoop.hive.ql.plan.loadFileDesc;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.plan.moveWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.plan.tableScanDesc;
-import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.conf.HiveConf;
/**
* Processor for the rule - table scan followed by reduce sink
@@ -74,178 +73,206 @@
}
/**
- * File Sink Operator encountered
- * @param nd the file sink operator encountered
- * @param opProcCtx context
+ * File Sink Operator encountered
+ *
+ * @param nd
+ * the file sink operator encountered
+ * @param opProcCtx
+ * context
*/
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
- GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+ Object... nodeOutputs) throws SemanticException {
+ GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
ParseContext parseCtx = ctx.getParseCtx();
boolean chDir = false;
Task<? extends Serializable> currTask = ctx.getCurrTask();
// Has the user enabled merging of files for map-only jobs or for all jobs
- if ((ctx.getMvTask() != null) && (!ctx.getMvTask().isEmpty()))
- {
+ if ((ctx.getMvTask() != null) && (!ctx.getMvTask().isEmpty())) {
List<Task<? extends Serializable>> mvTasks = ctx.getMvTask();
- // In case of unions or map-joins, it is possible that the file has already been seen.
+ // In case of unions or map-joins, it is possible that the file has
+ // already been seen.
// So, no need to attempt to merge the files again.
- if ((ctx.getSeenFileSinkOps() == null) ||
- (!ctx.getSeenFileSinkOps().contains((FileSinkOperator)nd))) {
-
+ if ((ctx.getSeenFileSinkOps() == null)
+ || (!ctx.getSeenFileSinkOps().contains(nd))) {
+
// no need of merging if the move is to a local file system
- MoveTask mvTask = (MoveTask)findMoveTask(mvTasks, (FileSinkOperator)nd);
- if ((mvTask != null) && !mvTask.isLocal())
- {
- // There are separate configuration parameters to control whether to merge for a map-only job
+ MoveTask mvTask = (MoveTask) findMoveTask(mvTasks,
+ (FileSinkOperator) nd);
+ if ((mvTask != null) && !mvTask.isLocal()) {
+ // There are separate configuration parameters to control whether to
+ // merge for a map-only job
// or for a map-reduce job
- if ((parseCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) &&
- (((mapredWork)currTask.getWork()).getReducer() == null)) ||
- (parseCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES) &&
- (((mapredWork)currTask.getWork()).getReducer() != null)))
+ if ((parseCtx.getConf().getBoolVar(
+ HiveConf.ConfVars.HIVEMERGEMAPFILES) && (((mapredWork) currTask
+ .getWork()).getReducer() == null))
+ || (parseCtx.getConf().getBoolVar(
+ HiveConf.ConfVars.HIVEMERGEMAPREDFILES) && (((mapredWork) currTask
+ .getWork()).getReducer() != null))) {
chDir = true;
+ }
}
}
}
String finalName = processFS(nd, stack, opProcCtx, chDir);
-
+
// If it is a map-only job, insert a new task to do the concatenation
if (chDir && (finalName != null)) {
- createMergeJob((FileSinkOperator)nd, ctx, finalName);
+ createMergeJob((FileSinkOperator) nd, ctx, finalName);
}
-
+
return null;
}
-
- private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) {
+
+ private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx,
+ String finalName) {
Task<? extends Serializable> currTask = ctx.getCurrTask();
RowSchema fsRS = fsOp.getSchema();
-
+
// create a reduce Sink operator - key is the first column
ArrayList<exprNodeDesc> keyCols = new ArrayList<exprNodeDesc>();
- keyCols.add(TypeCheckProcFactory.DefaultExprProcessor.getFuncExprNodeDesc("rand"));
+ keyCols.add(TypeCheckProcFactory.DefaultExprProcessor
+ .getFuncExprNodeDesc("rand"));
ArrayList<exprNodeDesc> valueCols = new ArrayList<exprNodeDesc>();
for (ColumnInfo ci : fsRS.getSignature()) {
- valueCols.add(new exprNodeColumnDesc(ci.getType(), ci.getInternalName(), ci.getTabAlias(),
- ci.getIsPartitionCol()));
+ valueCols.add(new exprNodeColumnDesc(ci.getType(), ci.getInternalName(),
+ ci.getTabAlias(), ci.getIsPartitionCol()));
}
// create a dummy tableScan operator
- Operator<? extends Serializable> ts_op =
- OperatorFactory.get(tableScanDesc.class, fsRS);
+ Operator<? extends Serializable> ts_op = OperatorFactory.get(
+ tableScanDesc.class, fsRS);
ArrayList<String> outputColumns = new ArrayList<String>();
- for (int i = 0; i < valueCols.size(); i++)
+ for (int i = 0; i < valueCols.size(); i++) {
outputColumns.add(SemanticAnalyzer.getColumnInternalName(i));
-
- reduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(new ArrayList<exprNodeDesc>(), valueCols,
- outputColumns, false, -1, -1, -1);
- ReduceSinkOperator rsOp = (ReduceSinkOperator)OperatorFactory.getAndMakeChild(rsDesc, fsRS, ts_op);
+ }
+
+ reduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(
+ new ArrayList<exprNodeDesc>(), valueCols, outputColumns, false, -1, -1,
+ -1);
+ OperatorFactory.getAndMakeChild(rsDesc, fsRS, ts_op);
mapredWork cplan = GenMapRedUtils.getMapRedWork();
ParseContext parseCtx = ctx.getParseCtx();
- Task<? extends Serializable> mergeTask = TaskFactory.get(cplan, parseCtx.getConf());
+ Task<? extends Serializable> mergeTask = TaskFactory.get(cplan, parseCtx
+ .getConf());
fileSinkDesc fsConf = fsOp.getConf();
-
+
// Add the extract operator to get the value fields
RowResolver out_rwsch = new RowResolver();
- RowResolver interim_rwsch = ctx.getParseCtx().getOpParseCtx().get(fsOp).getRR();
+ RowResolver interim_rwsch = ctx.getParseCtx().getOpParseCtx().get(fsOp)
+ .getRR();
Integer pos = Integer.valueOf(0);
- for(ColumnInfo colInfo: interim_rwsch.getColumnInfos()) {
- String [] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
- out_rwsch.put(info[0], info[1],
- new ColumnInfo(pos.toString(), colInfo.getType(), info[0],
- colInfo.getIsPartitionCol()));
+ for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) {
+ String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
+ out_rwsch.put(info[0], info[1], new ColumnInfo(pos.toString(), colInfo
+ .getType(), info[0], colInfo.getIsPartitionCol()));
pos = Integer.valueOf(pos.intValue() + 1);
}
- Operator extract =
- OperatorFactory.getAndMakeChild(
- new extractDesc(new exprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
- Utilities.ReduceField.VALUE.toString(), "", false)),
- new RowSchema(out_rwsch.getColumnInfos()));
-
- tableDesc ts = (tableDesc)fsConf.getTableInfo().clone();
- fsConf.getTableInfo().getProperties().remove(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
- FileSinkOperator newOutput =
- (FileSinkOperator)OperatorFactory.getAndMakeChild(
- new fileSinkDesc(finalName, ts,
- parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)),
- fsRS, extract);
+ Operator extract = OperatorFactory.getAndMakeChild(new extractDesc(
+ new exprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+ Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema(
+ out_rwsch.getColumnInfos()));
+
+ tableDesc ts = (tableDesc) fsConf.getTableInfo().clone();
+ fsConf
+ .getTableInfo()
+ .getProperties()
+ .remove(
+ org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
+ FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory
+ .getAndMakeChild(new fileSinkDesc(finalName, ts, parseCtx.getConf()
+ .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)), fsRS, extract);
cplan.setReducer(extract);
ArrayList<String> aliases = new ArrayList<String>();
aliases.add(fsConf.getDirName());
cplan.getPathToAliases().put(fsConf.getDirName(), aliases);
- cplan.getAliasToWork().put(fsConf.getDirName(), ts_op);
- cplan.getPathToPartitionInfo().put(fsConf.getDirName(), new partitionDesc(fsConf.getTableInfo(), null));
+ cplan.getAliasToWork().put(fsConf.getDirName(), ts_op);
+ cplan.getPathToPartitionInfo().put(fsConf.getDirName(),
+ new partitionDesc(fsConf.getTableInfo(), null));
cplan.setNumReduceTasks(-1);
-
- moveWork dummyMv = new moveWork(null, null, null, new loadFileDesc(fsOp.getConf().getDirName(), finalName, true, null, null), false);
- Task<? extends Serializable> dummyMergeTask = TaskFactory.get(dummyMv, ctx.getConf());
+
+ moveWork dummyMv = new moveWork(null, null, null, new loadFileDesc(fsOp
+ .getConf().getDirName(), finalName, true, null, null), false);
+ Task<? extends Serializable> dummyMergeTask = TaskFactory.get(dummyMv, ctx
+ .getConf());
List<Serializable> listWorks = new ArrayList<Serializable>();
listWorks.add(dummyMv);
listWorks.add(mergeTask.getWork());
ConditionalWork cndWork = new ConditionalWork(listWorks);
-
- ConditionalTask cndTsk = (ConditionalTask)TaskFactory.get(cndWork, ctx.getConf());
+
+ ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, ctx
+ .getConf());
List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
listTasks.add(dummyMergeTask);
listTasks.add(mergeTask);
cndTsk.setListTasks(listTasks);
-
+
cndTsk.setResolver(new ConditionalResolverMergeFiles());
- cndTsk.setResolverCtx(new ConditionalResolverMergeFilesCtx(listTasks, fsOp.getConf().getDirName()));
-
+ cndTsk.setResolverCtx(new ConditionalResolverMergeFilesCtx(listTasks, fsOp
+ .getConf().getDirName()));
+
currTask.addDependentTask(cndTsk);
-
+
List<Task<? extends Serializable>> mvTasks = ctx.getMvTask();
Task<? extends Serializable> mvTask = findMoveTask(mvTasks, newOutput);
-
+
if (mvTask != null) {
- for(Task<? extends Serializable> tsk : cndTsk.getListTasks())
+ for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
tsk.addDependentTask(mvTask);
+ }
}
}
-
- private Task<? extends Serializable> findMoveTask(List<Task<? extends Serializable>> mvTasks, FileSinkOperator fsOp) {
+
+ private Task<? extends Serializable> findMoveTask(
+ List<Task<? extends Serializable>> mvTasks, FileSinkOperator fsOp) {
// find the move task
for (Task<? extends Serializable> mvTsk : mvTasks) {
- moveWork mvWork = (moveWork)mvTsk.getWork();
+ moveWork mvWork = (moveWork) mvTsk.getWork();
String srcDir = null;
- if (mvWork.getLoadFileWork() != null)
+ if (mvWork.getLoadFileWork() != null) {
srcDir = mvWork.getLoadFileWork().getSourceDir();
- else if (mvWork.getLoadTableWork() != null)
+ } else if (mvWork.getLoadTableWork() != null) {
srcDir = mvWork.getLoadTableWork().getSourceDir();
-
- if ((srcDir != null) && (srcDir.equalsIgnoreCase(fsOp.getConf().getDirName())))
+ }
+
+ if ((srcDir != null)
+ && (srcDir.equalsIgnoreCase(fsOp.getConf().getDirName()))) {
return mvTsk;
+ }
}
-
+
return null;
}
-
- private String processFS(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, boolean chDir)
- throws SemanticException {
-
+
+ private String processFS(Node nd, Stack<Node> stack,
+ NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException {
+
// Is it the dummy file sink after the mapjoin
- FileSinkOperator fsOp = (FileSinkOperator)nd;
- if ((fsOp.getParentOperators().size() == 1) && (fsOp.getParentOperators().get(0) instanceof MapJoinOperator))
+ FileSinkOperator fsOp = (FileSinkOperator) nd;
+ if ((fsOp.getParentOperators().size() == 1)
+ && (fsOp.getParentOperators().get(0) instanceof MapJoinOperator)) {
return null;
+ }
- GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+ GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
List<FileSinkOperator> seenFSOps = ctx.getSeenFileSinkOps();
- if (seenFSOps == null)
+ if (seenFSOps == null) {
seenFSOps = new ArrayList<FileSinkOperator>();
- if (!seenFSOps.contains(fsOp))
+ }
+ if (!seenFSOps.contains(fsOp)) {
seenFSOps.add(fsOp);
+ }
ctx.setSeenFileSinkOps(seenFSOps);
Task<? extends Serializable> currTask = ctx.getCurrTask();
-
+
// If the directory needs to be changed, send the new directory
String dest = null;
@@ -256,27 +283,30 @@
ParseContext parseCtx = ctx.getParseCtx();
Context baseCtx = parseCtx.getContext();
String tmpDir = baseCtx.getMRTmpFileURI();
-
+
fsOp.getConf().setDirName(tmpDir);
}
-
- boolean ret = false;
+
Task<? extends Serializable> mvTask = null;
-
- if (!chDir)
+
+ if (!chDir) {
mvTask = findMoveTask(ctx.getMvTask(), fsOp);
-
+ }
+
Operator<? extends Serializable> currTopOp = ctx.getCurrTopOp();
String currAliasId = ctx.getCurrAliasId();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
List<Operator<? extends Serializable>> seenOps = ctx.getSeenOps();
- List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
+ List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
// Set the move task to be dependent on the current task
- if (mvTask != null)
- ret = currTask.addDependentTask(mvTask);
-
- // In case of multi-table insert, the path to alias mapping is needed for all the sources. Since there is no
+ if (mvTask != null) {
+ currTask.addDependentTask(mvTask);
+ }
+
+ // In case of multi-table insert, the path to alias mapping is needed for
+ // all the sources. Since there is no
// reducer, treat it as a plan with null reducer
// If it is a map-only job, the task needs to be processed
if (currTopOp != null) {
@@ -284,19 +314,20 @@
if (mapTask == null) {
assert (!seenOps.contains(currTopOp));
seenOps.add(currTopOp);
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, (mapredWork) currTask.getWork(), false, ctx);
+ GenMapRedUtils.setTaskPlan(currAliasId, currTopOp,
+ (mapredWork) currTask.getWork(), false, ctx);
opTaskMap.put(null, currTask);
rootTasks.add(currTask);
- }
- else {
+ } else {
if (!seenOps.contains(currTopOp)) {
seenOps.add(currTopOp);
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, (mapredWork) mapTask.getWork(), false, ctx);
+ GenMapRedUtils.setTaskPlan(currAliasId, currTopOp,
+ (mapredWork) mapTask.getWork(), false, ctx);
}
- // mapTask and currTask should be merged by and join/union operator
+ // mapTask and currTask should be merged by and join/union operator
// (e.g., GenMRUnion1j) which has multiple topOps.
- assert mapTask == currTask :
- "mapTask.id = " + mapTask.getId() + "; currTask.id = " + currTask.getId();
+ assert mapTask == currTask : "mapTask.id = " + mapTask.getId()
+ + "; currTask.id = " + currTask.getId();
}
return dest;
@@ -304,30 +335,31 @@
}
UnionOperator currUnionOp = ctx.getCurrUnionOp();
-
- if (currUnionOp != null) {
+
+ if (currUnionOp != null) {
opTaskMap.put(null, currTask);
GenMapRedUtils.initUnionPlan(ctx, currTask, false);
return dest;
}
-
+
MapJoinOperator currMapJoinOp = ctx.getCurrMapJoinOp();
-
- if (currMapJoinOp != null) {
+
+ if (currMapJoinOp != null) {
opTaskMap.put(null, currTask);
GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(currMapJoinOp);
mapredWork plan = (mapredWork) currTask.getWork();
String taskTmpDir = mjCtx.getTaskTmpDir();
- tableDesc tt_desc = mjCtx.getTTDesc();
+ tableDesc tt_desc = mjCtx.getTTDesc();
assert plan.getPathToAliases().get(taskTmpDir) == null;
plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
- plan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
+ plan.getPathToPartitionInfo().put(taskTmpDir,
+ new partitionDesc(tt_desc, null));
plan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
return dest;
}
-
+
return dest;
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java Thu Jan 21 10:37:58 2010
@@ -23,11 +23,11 @@
import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
* Processor for the rule - no specific rule fired
@@ -38,17 +38,23 @@
}
/**
- * Reduce Scan encountered
- * @param nd the reduce sink operator encountered
- * @param procCtx context
+ * Reduce Scan encountered
+ *
+ * @param nd
+ * the reduce sink operator encountered
+ * @param procCtx
+ * context
*/
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
- GenMRProcContext ctx = (GenMRProcContext)procCtx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ GenMRProcContext ctx = (GenMRProcContext) procCtx;
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
- GenMapRedCtx mapredCtx = mapCurrCtx.get((Operator<? extends Serializable>)stack.get(stack.size()-2));
- mapCurrCtx.put((Operator<? extends Serializable>)nd,
- new GenMapRedCtx(mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx.getCurrAliasId()));
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
+ mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+ mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx
+ .getCurrAliasId()));
return null;
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Thu Jan 21 10:37:58 2010
@@ -18,21 +18,19 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.LinkedHashMap;
-import java.util.List;
+import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Map;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Set;
-import java.io.Serializable;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -40,32 +38,35 @@
import org.apache.hadoop.hive.ql.plan.tableDesc;
/**
- * Processor Context for creating map reduce task. Walk the tree in a DFS manner and process the nodes. Some state is
- * maintained about the current nodes visited so far.
+ * Processor Context for creating map reduce task. Walk the tree in a DFS manner
+ * and process the nodes. Some state is maintained about the current nodes
+ * visited so far.
*/
public class GenMRProcContext implements NodeProcessorCtx {
- /**
- * GenMapRedCtx is used to keep track of the current state.
+ /**
+ * GenMapRedCtx is used to keep track of the current state.
*/
public static class GenMapRedCtx {
- Task<? extends Serializable> currTask;
- Operator<? extends Serializable> currTopOp;
- String currAliasId;
-
- public GenMapRedCtx() {
+ Task<? extends Serializable> currTask;
+ Operator<? extends Serializable> currTopOp;
+ String currAliasId;
+
+ public GenMapRedCtx() {
}
-
+
/**
- * @param currTask the current task
- * @param currTopOp the current top operator being traversed
- * @param currAliasId the current alias for the to operator
+ * @param currTask
+ * the current task
+ * @param currTopOp
+ * the current top operator being traversed
+ * @param currAliasId
+ * the current alias for the to operator
*/
- public GenMapRedCtx (Task<? extends Serializable> currTask,
- Operator<? extends Serializable> currTopOp,
- String currAliasId) {
- this.currTask = currTask;
- this.currTopOp = currTopOp;
+ public GenMapRedCtx(Task<? extends Serializable> currTask,
+ Operator<? extends Serializable> currTopOp, String currAliasId) {
+ this.currTask = currTask;
+ this.currTopOp = currTopOp;
this.currAliasId = currAliasId;
}
@@ -92,24 +93,24 @@
}
public static class GenMRUnionCtx {
- Task<? extends Serializable> uTask;
- List<String> taskTmpDir;
- List<tableDesc> tt_desc;
+ Task<? extends Serializable> uTask;
+ List<String> taskTmpDir;
+ List<tableDesc> tt_desc;
- public GenMRUnionCtx() {
+ public GenMRUnionCtx() {
uTask = null;
taskTmpDir = new ArrayList<String>();
- tt_desc = new ArrayList<tableDesc>();
+ tt_desc = new ArrayList<tableDesc>();
}
- public Task<? extends Serializable> getUTask() {
+ public Task<? extends Serializable> getUTask() {
return uTask;
}
- public void setUTask(Task<? extends Serializable> uTask) {
+ public void setUTask(Task<? extends Serializable> uTask) {
this.uTask = uTask;
}
-
+
public void addTaskTmpDir(String taskTmpDir) {
this.taskTmpDir.add(taskTmpDir);
}
@@ -128,16 +129,16 @@
}
public static class GenMRMapJoinCtx {
- String taskTmpDir;
- tableDesc tt_desc;
- Operator<? extends Serializable> rootMapJoinOp;
- MapJoinOperator oldMapJoin;
-
- public GenMRMapJoinCtx() {
- taskTmpDir = null;
- tt_desc = null;
+ String taskTmpDir;
+ tableDesc tt_desc;
+ Operator<? extends Serializable> rootMapJoinOp;
+ MapJoinOperator oldMapJoin;
+
+ public GenMRMapJoinCtx() {
+ taskTmpDir = null;
+ tt_desc = null;
rootMapJoinOp = null;
- oldMapJoin = null;
+ oldMapJoin = null;
}
/**
@@ -146,14 +147,15 @@
* @param rootMapJoinOp
* @param oldMapJoin
*/
- public GenMRMapJoinCtx(String taskTmpDir, tableDesc tt_desc,
- Operator<? extends Serializable> rootMapJoinOp, MapJoinOperator oldMapJoin) {
- this.taskTmpDir = taskTmpDir;
- this.tt_desc = tt_desc;
+ public GenMRMapJoinCtx(String taskTmpDir, tableDesc tt_desc,
+ Operator<? extends Serializable> rootMapJoinOp,
+ MapJoinOperator oldMapJoin) {
+ this.taskTmpDir = taskTmpDir;
+ this.tt_desc = tt_desc;
this.rootMapJoinOp = rootMapJoinOp;
- this.oldMapJoin = oldMapJoin;
+ this.oldMapJoin = oldMapJoin;
}
-
+
public void setTaskTmpDir(String taskTmpDir) {
this.taskTmpDir = taskTmpDir;
}
@@ -178,7 +180,8 @@
}
/**
- * @param rootMapJoinOp the rootMapJoinOp to set
+ * @param rootMapJoinOp
+ * the rootMapJoinOp to set
*/
public void setRootMapJoinOp(Operator<? extends Serializable> rootMapJoinOp) {
this.rootMapJoinOp = rootMapJoinOp;
@@ -192,7 +195,8 @@
}
/**
- * @param oldMapJoin the oldMapJoin to set
+ * @param oldMapJoin
+ * the oldMapJoin to set
*/
public void setOldMapJoin(MapJoinOperator oldMapJoin) {
this.oldMapJoin = oldMapJoin;
@@ -201,174 +205,188 @@
private HiveConf conf;
private HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap;
- private HashMap<UnionOperator, GenMRUnionCtx> unionTaskMap;
- private HashMap<MapJoinOperator, GenMRMapJoinCtx> mapJoinTaskMap;
+ private HashMap<UnionOperator, GenMRUnionCtx> unionTaskMap;
+ private HashMap<MapJoinOperator, GenMRMapJoinCtx> mapJoinTaskMap;
private List<Operator<? extends Serializable>> seenOps;
- private List<FileSinkOperator> seenFileSinkOps;
+ private List<FileSinkOperator> seenFileSinkOps;
- private ParseContext parseCtx;
- private List<Task<? extends Serializable>> mvTask;
- private List<Task<? extends Serializable>> rootTasks;
-
- private LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx;
- private Task<? extends Serializable> currTask;
- private Operator<? extends Serializable> currTopOp;
- private UnionOperator currUnionOp;
- private MapJoinOperator currMapJoinOp;
- private String currAliasId;
+ private ParseContext parseCtx;
+ private List<Task<? extends Serializable>> mvTask;
+ private List<Task<? extends Serializable>> rootTasks;
+
+ private LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx;
+ private Task<? extends Serializable> currTask;
+ private Operator<? extends Serializable> currTopOp;
+ private UnionOperator currUnionOp;
+ private MapJoinOperator currMapJoinOp;
+ private String currAliasId;
private List<Operator<? extends Serializable>> rootOps;
-
+
/**
- * Set of read entities. This list is generated by the walker and is
- * passed to the hooks.
+ * Set of read entities. This list is generated by the walker and is passed to
+ * the hooks.
*/
- private Set<ReadEntity> inputs;
+ private Set<ReadEntity> inputs;
/**
- * Set of write entities. This list is generated by the walker and is
- * passed to the hooks.
- */
- private Set<WriteEntity> outputs;
-
- public GenMRProcContext() {
- }
-
- /**
- * @param conf hive configuration
- * @param opTaskMap reducer to task mapping
- * @param seenOps operator already visited
- * @param parseCtx current parse context
- * @param rootTasks root tasks for the plan
- * @param mvTask the final move task
- * @param mapCurrCtx operator to task mappings
- * @param inputs the set of input tables/partitions generated by the walk
- * @param outputs the set of destinations generated by the walk
- */
- public GenMRProcContext (
- HiveConf conf,
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap,
- List<Operator<? extends Serializable>> seenOps,
- ParseContext parseCtx,
- List<Task<? extends Serializable>> mvTask,
- List<Task<? extends Serializable>> rootTasks,
- LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx,
- Set<ReadEntity> inputs,
- Set<WriteEntity> outputs)
- {
- this.conf = conf;
- this.opTaskMap = opTaskMap;
- this.seenOps = seenOps;
- this.mvTask = mvTask;
- this.parseCtx = parseCtx;
- this.rootTasks = rootTasks;
+ * Set of write entities. This list is generated by the walker and is passed
+ * to the hooks.
+ */
+ private Set<WriteEntity> outputs;
+
+ public GenMRProcContext() {
+ }
+
+ /**
+ * @param conf
+ * hive configuration
+ * @param opTaskMap
+ * reducer to task mapping
+ * @param seenOps
+ * operator already visited
+ * @param parseCtx
+ * current parse context
+ * @param rootTasks
+ * root tasks for the plan
+ * @param mvTask
+ * the final move task
+ * @param mapCurrCtx
+ * operator to task mappings
+ * @param inputs
+ * the set of input tables/partitions generated by the walk
+ * @param outputs
+ * the set of destinations generated by the walk
+ */
+ public GenMRProcContext(
+ HiveConf conf,
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap,
+ List<Operator<? extends Serializable>> seenOps, ParseContext parseCtx,
+ List<Task<? extends Serializable>> mvTask,
+ List<Task<? extends Serializable>> rootTasks,
+ LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx,
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
+ this.conf = conf;
+ this.opTaskMap = opTaskMap;
+ this.seenOps = seenOps;
+ this.mvTask = mvTask;
+ this.parseCtx = parseCtx;
+ this.rootTasks = rootTasks;
this.mapCurrCtx = mapCurrCtx;
this.inputs = inputs;
this.outputs = outputs;
- currTask = null;
- currTopOp = null;
- currUnionOp = null;
- currMapJoinOp = null;
- currAliasId = null;
- rootOps = new ArrayList<Operator<? extends Serializable>>();
+ currTask = null;
+ currTopOp = null;
+ currUnionOp = null;
+ currMapJoinOp = null;
+ currAliasId = null;
+ rootOps = new ArrayList<Operator<? extends Serializable>>();
rootOps.addAll(parseCtx.getTopOps().values());
unionTaskMap = new HashMap<UnionOperator, GenMRUnionCtx>();
mapJoinTaskMap = new HashMap<MapJoinOperator, GenMRMapJoinCtx>();
}
/**
- * @return reducer to task mapping
+ * @return reducer to task mapping
*/
public HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> getOpTaskMap() {
return opTaskMap;
}
/**
- * @param opTaskMap reducer to task mapping
+ * @param opTaskMap
+ * reducer to task mapping
*/
- public void setOpTaskMap(HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap) {
+ public void setOpTaskMap(
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap) {
this.opTaskMap = opTaskMap;
}
/**
- * @return operators already visited
+ * @return operators already visited
*/
public List<Operator<? extends Serializable>> getSeenOps() {
return seenOps;
}
/**
- * @return file operators already visited
+ * @return file operators already visited
*/
public List<FileSinkOperator> getSeenFileSinkOps() {
return seenFileSinkOps;
}
/**
- * @param seenOps operators already visited
+ * @param seenOps
+ * operators already visited
*/
public void setSeenOps(List<Operator<? extends Serializable>> seenOps) {
this.seenOps = seenOps;
}
/**
- * @param seenFileSinkOps file sink operators already visited
+ * @param seenFileSinkOps
+ * file sink operators already visited
*/
public void setSeenFileSinkOps(List<FileSinkOperator> seenFileSinkOps) {
this.seenFileSinkOps = seenFileSinkOps;
}
/**
- * @return top operators for tasks
+ * @return top operators for tasks
*/
public List<Operator<? extends Serializable>> getRootOps() {
return rootOps;
}
/**
- * @param rootOps top operators for tasks
+ * @param rootOps
+ * top operators for tasks
*/
public void setRootOps(List<Operator<? extends Serializable>> rootOps) {
this.rootOps = rootOps;
}
/**
- * @return current parse context
+ * @return current parse context
*/
public ParseContext getParseCtx() {
return parseCtx;
}
/**
- * @param parseCtx current parse context
+ * @param parseCtx
+ * current parse context
*/
public void setParseCtx(ParseContext parseCtx) {
this.parseCtx = parseCtx;
}
/**
- * @return the final move task
+ * @return the final move task
*/
public List<Task<? extends Serializable>> getMvTask() {
return mvTask;
}
/**
- * @param mvTask the final move task
+ * @param mvTask
+ * the final move task
*/
public void setMvTask(List<Task<? extends Serializable>> mvTask) {
this.mvTask = mvTask;
}
/**
- * @return root tasks for the plan
+ * @return root tasks for the plan
*/
- public List<Task<? extends Serializable>> getRootTasks() {
+ public List<Task<? extends Serializable>> getRootTasks() {
return rootTasks;
}
/**
- * @param rootTasks root tasks for the plan
+ * @param rootTasks
+ * root tasks for the plan
*/
- public void setRootTasks(List<Task<? extends Serializable>> rootTasks) {
+ public void setRootTasks(List<Task<? extends Serializable>> rootTasks) {
this.rootTasks = rootTasks;
}
@@ -380,23 +398,26 @@
}
/**
- * @param mapCurrCtx operator to task mappings
+ * @param mapCurrCtx
+ * operator to task mappings
*/
- public void setMapCurrCtx(LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx) {
+ public void setMapCurrCtx(
+ LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx) {
this.mapCurrCtx = mapCurrCtx;
}
/**
* @return current task
*/
- public Task<? extends Serializable> getCurrTask() {
+ public Task<? extends Serializable> getCurrTask() {
return currTask;
}
/**
- * @param currTask current task
+ * @param currTask
+ * current task
*/
- public void setCurrTask(Task<? extends Serializable> currTask) {
+ public void setCurrTask(Task<? extends Serializable> currTask) {
this.currTask = currTask;
}
@@ -405,46 +426,50 @@
*/
public Operator<? extends Serializable> getCurrTopOp() {
return currTopOp;
- }
-
+ }
+
/**
- * @param currTopOp current top operator
+ * @param currTopOp
+ * current top operator
*/
public void setCurrTopOp(Operator<? extends Serializable> currTopOp) {
this.currTopOp = currTopOp;
- }
+ }
public UnionOperator getCurrUnionOp() {
return currUnionOp;
- }
-
+ }
+
/**
- * @param currUnionOp current union operator
+ * @param currUnionOp
+ * current union operator
*/
public void setCurrUnionOp(UnionOperator currUnionOp) {
this.currUnionOp = currUnionOp;
- }
+ }
public MapJoinOperator getCurrMapJoinOp() {
return currMapJoinOp;
- }
-
+ }
+
/**
- * @param currMapJoinOp current map join operator
+ * @param currMapJoinOp
+ * current map join operator
*/
public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) {
this.currMapJoinOp = currMapJoinOp;
- }
+ }
/**
* @return current top alias
*/
- public String getCurrAliasId() {
+ public String getCurrAliasId() {
return currAliasId;
}
/**
- * @param currAliasId current top alias
+ * @param currAliasId
+ * current top alias
*/
public void setCurrAliasId(String currAliasId) {
this.currAliasId = currAliasId;
@@ -472,7 +497,7 @@
public Set<ReadEntity> getInputs() {
return inputs;
}
-
+
/**
* Get the output set.
*/
@@ -488,7 +513,8 @@
}
/**
- * @param conf the conf to set
+ * @param conf
+ * the conf to set
*/
public void setConf(HiveConf conf) {
this.conf = conf;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Thu Jan 21 10:37:58 2010
@@ -18,20 +18,20 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.Map;
+import java.io.Serializable;
import java.util.HashMap;
+import java.util.Map;
import java.util.Stack;
-import java.io.Serializable;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
/**
* Processor for the rule - table scan followed by reduce sink
@@ -42,22 +42,28 @@
}
/**
- * Reduce Scan encountered
- * @param nd the reduce sink operator encountered
- * @param opProcCtx context
+ * Reduce Scan encountered
+ *
+ * @param nd
+ * the reduce sink operator encountered
+ * @param opProcCtx
+ * context
*/
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
- ReduceSinkOperator op = (ReduceSinkOperator)nd;
- GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
-
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
- GenMapRedCtx mapredCtx = mapCurrCtx.get((Operator<? extends Serializable>)stack.get(stack.size()-2));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+ Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+ GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
+
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork currPlan = (mapredWork) currTask.getWork();
- Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+ Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
String currAliasId = mapredCtx.getCurrAliasId();
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
ctx.setCurrTopOp(currTopOp);
@@ -66,20 +72,24 @@
// If the plan for this reducer does not exist, initialize the plan
if (opMapTask == null) {
- if (currPlan.getReducer() == null)
+ if (currPlan.getReducer() == null) {
GenMapRedUtils.initPlan(op, ctx);
- else
+ } else {
GenMapRedUtils.splitPlan(op, ctx);
+ }
}
- // This will happen in case of joins. The current plan can be thrown away after being merged with the
+ // This will happen in case of joins. The current plan can be thrown away
+ // after being merged with the
// original plan
else {
- GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false, false, false);
+ GenMapRedUtils
+ .joinPlan(op, null, opMapTask, ctx, -1, false, false, false);
currTask = opMapTask;
ctx.setCurrTask(currTask);
}
- mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+ mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+ ctx.getCurrAliasId()));
return null;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java Thu Jan 21 10:37:58 2010
@@ -28,8 +28,8 @@
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
* Processor for the rule - reduce sink followed by reduce sink
@@ -40,36 +40,44 @@
}
/**
- * Reduce Scan encountered
- * @param nd the reduce sink operator encountered
- * @param opProcCtx context
+ * Reduce Scan encountered
+ *
+ * @param nd
+ * the reduce sink operator encountered
+ * @param opProcCtx
+ * context
*/
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
- ReduceSinkOperator op = (ReduceSinkOperator)nd;
- GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+ Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+ GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
String currAliasId = mapredCtx.getCurrAliasId();
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ Map<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
ctx.setCurrTopOp(currTopOp);
ctx.setCurrAliasId(currAliasId);
ctx.setCurrTask(currTask);
- if (opMapTask == null)
+ if (opMapTask == null) {
GenMapRedUtils.splitPlan(op, ctx);
- else {
- GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, false);
+ } else {
+ GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false,
+ false);
currTask = opMapTask;
ctx.setCurrTask(currTask);
}
- mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+ mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+ ctx.getCurrAliasId()));
return null;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Thu Jan 21 10:37:58 2010
@@ -18,26 +18,22 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.Map;
+import java.io.Serializable;
import java.util.HashMap;
+import java.util.Map;
import java.util.Stack;
-import java.io.Serializable;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
/**
* Processor for the rule - union followed by reduce sink
@@ -48,54 +44,64 @@
}
/**
- * Reduce Scan encountered
- * @param nd the reduce sink operator encountered
- * @param opProcCtx context
+ * Reduce Scan encountered
+ *
+ * @param nd
+ * the reduce sink operator encountered
+ * @param opProcCtx
+ * context
*/
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
- ReduceSinkOperator op = (ReduceSinkOperator)nd;
- GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+ Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+ GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
ParseContext parseCtx = ctx.getParseCtx();
UnionProcContext uCtx = parseCtx.getUCtx();
// union was map only - no special processing needed
- if (uCtx.isMapOnlySubq())
+ if (uCtx.isMapOnlySubq()) {
return (new GenMRRedSink1()).process(nd, stack, opProcCtx, nodeOutputs);
+ }
- // union consisted on a bunch of map-reduce jobs, and it has been split at the union
+ // union consisted on a bunch of map-reduce jobs, and it has been split at
+ // the union
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork plan = (mapredWork) currTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-
+
ctx.setCurrTask(currTask);
// If the plan for this reducer does not exist, initialize the plan
if (opMapTask == null) {
// When the reducer is encountered for the first time
- if (plan.getReducer() == null)
+ if (plan.getReducer() == null) {
GenMapRedUtils.initUnionPlan(op, ctx);
- // When union is followed by a multi-table insert
- else
+ // When union is followed by a multi-table insert
+ } else {
GenMapRedUtils.splitPlan(op, ctx);
+ }
}
- // The union is already initialized. However, the union is walked from another input
+ // The union is already initialized. However, the union is walked from
+ // another input
// initUnionPlan is idempotent
- else if (plan.getReducer() == reducer)
+ else if (plan.getReducer() == reducer) {
GenMapRedUtils.initUnionPlan(op, ctx);
- // There is a join after union. One of the branches of union has already been initialized.
- // Initialize the current branch, and join with the original plan.
- else {
+ } else {
GenMapRedUtils.initUnionPlan(ctx, currTask, false);
- GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, false);
+ GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false,
+ false);
}
- mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
-
+ mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+ ctx.getCurrAliasId()));
+
// the union operator has been processed
ctx.setCurrUnionOp(null);
return null;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java Thu Jan 21 10:37:58 2010
@@ -18,21 +18,20 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.Map;
+import java.io.Serializable;
import java.util.HashMap;
+import java.util.Map;
import java.util.Stack;
-import java.io.Serializable;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
/**
* Processor for the rule - map join followed by reduce sink
@@ -43,45 +42,56 @@
}
/**
- * Reduce Scan encountered
- * @param nd the reduce sink operator encountered
- * @param opProcCtx context
+ * Reduce Scan encountered
+ *
+ * @param nd
+ * the reduce sink operator encountered
+ * @param opProcCtx
+ * context
*/
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
- ReduceSinkOperator op = (ReduceSinkOperator)nd;
- GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+ Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+ GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
- ParseContext parseCtx = ctx.getParseCtx();
+ ctx.getParseCtx();
- // map-join consisted on a bunch of map-only jobs, and it has been split after the mapjoin
+ // map-join consisted on a bunch of map-only jobs, and it has been split
+ // after the mapjoin
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork plan = (mapredWork) currTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-
+
ctx.setCurrTask(currTask);
// If the plan for this reducer does not exist, initialize the plan
if (opMapTask == null) {
// When the reducer is encountered for the first time
- if (plan.getReducer() == null)
+ if (plan.getReducer() == null) {
GenMapRedUtils.initMapJoinPlan(op, ctx, true, false, true, -1);
- // When mapjoin is followed by a multi-table insert
- else
+ // When mapjoin is followed by a multi-table insert
+ } else {
GenMapRedUtils.splitPlan(op, ctx);
+ }
}
- // There is a join after mapjoin. One of the branches of mapjoin has already been initialized.
+ // There is a join after mapjoin. One of the branches of mapjoin has already
+ // been initialized.
// Initialize the current branch, and join with the original plan.
else {
assert plan.getReducer() != reducer;
- GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true, false);
+ GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true,
+ false);
}
- mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
-
+ mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+ ctx.getCurrAliasId()));
+
// the mapjoin operator has been processed
ctx.setCurrMapJoinOp(null);
return null;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Thu Jan 21 10:37:58 2010
@@ -29,9 +29,9 @@
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
* Processor for the rule - table scan
@@ -41,18 +41,24 @@
}
/**
- * Table Sink encountered
- * @param nd the table sink operator encountered
- * @param opProcCtx context
+ * Table Sink encountered
+ *
+ * @param nd
+ * the table sink operator encountered
+ * @param opProcCtx
+ * context
*/
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
- TableScanOperator op = (TableScanOperator)nd;
- GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+ Object... nodeOutputs) throws SemanticException {
+ TableScanOperator op = (TableScanOperator) nd;
+ GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
ParseContext parseCtx = ctx.getParseCtx();
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
- // create a dummy task
- Task<? extends Serializable> currTask = TaskFactory.get(GenMapRedUtils.getMapRedWork(), parseCtx.getConf());
+ // create a dummy task
+ Task<? extends Serializable> currTask = TaskFactory.get(GenMapRedUtils
+ .getMapRedWork(), parseCtx.getConf());
Operator<? extends Serializable> currTopOp = op;
ctx.setCurrTask(currTask);
ctx.setCurrTopOp(currTopOp);
@@ -71,4 +77,3 @@
}
}
-
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Thu Jan 21 10:37:58 2010
@@ -18,39 +18,37 @@
package org.apache.hadoop.hive.ql.optimizer;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.ArrayList;
-import java.util.Stack;
-import java.io.Serializable;
-import java.io.File;
import java.util.Map;
+import java.util.Stack;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
-import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
+import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
/**
* Processor for the rule - TableScan followed by Union
@@ -61,41 +59,51 @@
}
/**
- * Union Operator encountered .
- * Currently, the algorithm is pretty simple:
- * If all the sub-queries are map-only, dont do anything.
- * However, if there is a mapjoin followed by the union, merge at the union
- * Otherwise, insert a FileSink on top of all the sub-queries.
- *
+ * Union Operator encountered . Currently, the algorithm is pretty simple: If
+ * all the sub-queries are map-only, dont do anything. However, if there is a
+ * mapjoin followed by the union, merge at the union Otherwise, insert a
+ * FileSink on top of all the sub-queries.
+ *
* This can be optimized later on.
- * @param nd the file sink operator encountered
- * @param opProcCtx context
+ *
+ * @param nd
+ * the file sink operator encountered
+ * @param opProcCtx
+ * context
*/
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
- UnionOperator union = (UnionOperator)nd;
- GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
+ Object... nodeOutputs) throws SemanticException {
+ UnionOperator union = (UnionOperator) nd;
+ GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
ParseContext parseCtx = ctx.getParseCtx();
UnionProcContext uCtx = parseCtx.getUCtx();
- // Map-only subqueries can be optimized in future to not write to a file in future
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ // Map-only subqueries can be optimized in future to not write to a file in
+ // future
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
- // The plan needs to be broken only if one of the sub-queries involve a map-reduce job
+ // The plan needs to be broken only if one of the sub-queries involve a
+ // map-reduce job
if (uCtx.isMapOnlySubq()) {
// merge currTask from multiple topOps
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
- if ( opTaskMap != null && opTaskMap.size() > 0 ) {
- Task<? extends Serializable> tsk = opTaskMap.get(null);
- if ( tsk != null )
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
+ if (opTaskMap != null && opTaskMap.size() > 0) {
+ Task<? extends Serializable> tsk = opTaskMap.get(null);
+ if (tsk != null) {
ctx.setCurrTask(tsk);
+ }
}
-
+
UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
if ((uPrsCtx != null) && (uPrsCtx.getMapJoinQuery())) {
- GenMapRedUtils.mergeMapJoinUnion(union, ctx, UnionProcFactory.getPositionParent(union, stack));
+ GenMapRedUtils.mergeMapJoinUnion(union, ctx, UnionProcFactory
+ .getPositionParent(union, stack));
+ } else {
+ mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+ ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
}
- else
- mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
return null;
}
@@ -108,13 +116,15 @@
int pos = UnionProcFactory.getPositionParent(union, stack);
// is the current task a root task
- if (uPrsCtx.getRootTask(pos) && (!ctx.getRootTasks().contains(currTask)))
+ if (uPrsCtx.getRootTask(pos) && (!ctx.getRootTasks().contains(currTask))) {
ctx.getRootTasks().add(currTask);
+ }
GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
Task<? extends Serializable> uTask = null;
- Operator<? extends Serializable> parent = union.getParentOperators().get(pos);
+ Operator<? extends Serializable> parent = union.getParentOperators().get(
+ pos);
mapredWork uPlan = null;
// union is encountered for the first time
@@ -124,10 +134,9 @@
uTask = TaskFactory.get(uPlan, parseCtx.getConf());
uCtxTask.setUTask(uTask);
ctx.setUnionTask(union, uCtxTask);
- }
- else {
+ } else {
uTask = uCtxTask.getUTask();
- uPlan = (mapredWork)uTask.getWork();
+ uPlan = (mapredWork) uTask.getWork();
}
// If there is a mapjoin at position 'pos'
@@ -143,12 +152,13 @@
assert plan.getPathToAliases().get(taskTmpDir) == null;
plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
- plan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
+ plan.getPathToPartitionInfo().put(taskTmpDir,
+ new partitionDesc(tt_desc, null));
plan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
}
- tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(
- PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
+ tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+ .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
// generate the temporary file
Context baseCtx = parseCtx.getContext();
@@ -158,15 +168,14 @@
uCtxTask.addTaskTmpDir(taskTmpDir);
uCtxTask.addTTDesc(tt_desc);
- // The union task is empty. The files created for all the inputs are assembled in the
+ // The union task is empty. The files created for all the inputs are
+ // assembled in the
// union context and later used to initialize the union plan
// Create a file sink operator for this file name
- Operator<? extends Serializable> fs_op =
- OperatorFactory.get
- (new fileSinkDesc(taskTmpDir, tt_desc,
- parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE)),
- parent.getSchema());
+ Operator<? extends Serializable> fs_op = OperatorFactory.get(
+ new fileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
+ HiveConf.ConfVars.COMPRESSINTERMEDIATE)), parent.getSchema());
assert parent.getChildOperators().size() == 1;
parent.getChildOperators().set(0, fs_op);
@@ -178,14 +187,17 @@
currTask.addDependentTask(uTask);
// If it is map-only task, add the files to be processed
- if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos))
- GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx.getCurrTopOp(), (mapredWork) currTask.getWork(), false, ctx);
+ if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos)) {
+ GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx.getCurrTopOp(),
+ (mapredWork) currTask.getWork(), false, ctx);
+ }
ctx.setCurrTask(uTask);
ctx.setCurrAliasId(null);
ctx.setCurrTopOp(null);
- mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+ mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(ctx
+ .getCurrTask(), null, null));
return null;
}