You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [15/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Thu Jan 21 10:37:58 2010
@@ -18,87 +18,99 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.Iterator;
-import java.util.List;
+import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Map;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.io.Serializable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.plan.fetchWork;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
-import org.apache.hadoop.hive.ql.plan.mapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
-import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.tableScanDesc;
-import org.apache.hadoop.hive.ql.plan.filterDesc.sampleDesc;
-import org.apache.hadoop.hive.ql.metadata.*;
-import org.apache.hadoop.hive.ql.parse.*;
-import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.mapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.tableScanDesc;
+import org.apache.hadoop.hive.ql.plan.filterDesc.sampleDesc;
/**
- * General utility common functions for the Processor to convert operator into map-reduce tasks
+ * General utility common functions for the Processor to convert operator into
+ * map-reduce tasks
*/
public class GenMapRedUtils {
private static Log LOG;
static {
- LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
+ LOG = LogFactory
+ .getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
}
/**
* Initialize the current plan by adding it to root tasks
- * @param op the reduce sink operator encountered
- * @param opProcCtx processing context
+ *
+ * @param op
+ * the reduce sink operator encountered
+ * @param opProcCtx
+ * processing context
*/
- public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx) throws SemanticException {
+ public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
+ throws SemanticException {
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx
+ .getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork plan = (mapredWork) currTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+ .getOpTaskMap();
Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
opTaskMap.put(reducer, currTask);
plan.setReducer(reducer);
- reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+ reduceSinkDesc desc = op.getConf();
plan.setNumReduceTasks(desc.getNumReducers());
List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
rootTasks.add(currTask);
- if (reducer.getClass() == JoinOperator.class)
+ if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
+ }
assert currTopOp != null;
List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
@@ -117,39 +129,51 @@
/**
* Initialize the current plan by adding it to root tasks
- * @param op the map join operator encountered
- * @param opProcCtx processing context
- * @param pos position of the parent
- */
- public static void initMapJoinPlan(Operator<? extends Serializable> op, GenMRProcContext opProcCtx, boolean readInputMapJoin, boolean readInputUnion,
- boolean setReducer, int pos)
- throws SemanticException {
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ *
+ * @param op
+ * the map join operator encountered
+ * @param opProcCtx
+ * processing context
+ * @param pos
+ * position of the parent
+ */
+ public static void initMapJoinPlan(Operator<? extends Serializable> op,
+ GenMRProcContext opProcCtx, boolean readInputMapJoin,
+ boolean readInputUnion, boolean setReducer, int pos)
+ throws SemanticException {
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx
+ .getMapCurrCtx();
assert (((pos == -1) && (readInputMapJoin)) || (pos != -1));
int parentPos = (pos == -1) ? 0 : pos;
- GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(parentPos));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(
+ parentPos));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork plan = (mapredWork) currTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+ .getOpTaskMap();
Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
- // The mapjoin has already been encountered. Some context must be stored about that
+ // The mapjoin has already been encountered. Some context must be stored
+ // about that
if (readInputMapJoin) {
MapJoinOperator currMapJoinOp = opProcCtx.getCurrMapJoinOp();
assert currMapJoinOp != null;
- boolean local = ((pos == -1) || (pos == ((mapJoinDesc)currMapJoinOp.getConf()).getPosBigTable())) ? false : true;
+ boolean local = ((pos == -1) || (pos == (currMapJoinOp.getConf())
+ .getPosBigTable())) ? false : true;
if (setReducer) {
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+ Operator<? extends Serializable> reducer = op.getChildOperators()
+ .get(0);
plan.setReducer(reducer);
opTaskMap.put(reducer, currTask);
- if (reducer.getClass() == JoinOperator.class)
+ if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
- reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+ }
+ reduceSinkDesc desc = (reduceSinkDesc) op.getConf();
plan.setNumReduceTasks(desc.getNumReducers());
- }
- else
+ } else {
opTaskMap.put(op, currTask);
+ }
if (!readInputUnion) {
GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(currMapJoinOp);
@@ -161,24 +185,22 @@
taskTmpDir = mjCtx.getTaskTmpDir();
tt_desc = mjCtx.getTTDesc();
rootOp = mjCtx.getRootMapJoinOp();
- }
- else {
- GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(mjCtx.getOldMapJoin());
+ } else {
+ GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(mjCtx
+ .getOldMapJoin());
taskTmpDir = oldMjCtx.getTaskTmpDir();
tt_desc = oldMjCtx.getTTDesc();
rootOp = oldMjCtx.getRootMapJoinOp();
}
setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
- }
- else {
+ } else {
initUnionPlan(opProcCtx, currTask, false);
}
opProcCtx.setCurrMapJoinOp(null);
- }
- else {
- mapJoinDesc desc = (mapJoinDesc)op.getConf();
+ } else {
+ mapJoinDesc desc = (mapJoinDesc) op.getConf();
// The map is overloaded to keep track of mapjoins also
opTaskMap.put(op, currTask);
@@ -202,43 +224,50 @@
/**
* Initialize the current union plan.
- *
- * @param op the reduce sink operator encountered
- * @param opProcCtx processing context
+ *
+ * @param op
+ * the reduce sink operator encountered
+ * @param opProcCtx
+ * processing context
*/
- public static void initUnionPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx) throws SemanticException {
+ public static void initUnionPlan(ReduceSinkOperator op,
+ GenMRProcContext opProcCtx) throws SemanticException {
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx
+ .getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork plan = (mapredWork) currTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+ .getOpTaskMap();
opTaskMap.put(reducer, currTask);
plan.setReducer(reducer);
- reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+ reduceSinkDesc desc = op.getConf();
plan.setNumReduceTasks(desc.getNumReducers());
- if (reducer.getClass() == JoinOperator.class)
+ if (reducer.getClass() == JoinOperator.class) {
plan.setNeedsTagging(true);
+ }
initUnionPlan(opProcCtx, currTask, false);
}
/*
- * It is a idempotent function to add various intermediate files as the source for the
- * union. The plan has already been created.
+ * It is a idempotent function to add various intermediate files as the source
+ * for the union. The plan has already been created.
*/
- public static void initUnionPlan(GenMRProcContext opProcCtx, Task<? extends Serializable> currTask, boolean local) {
+ public static void initUnionPlan(GenMRProcContext opProcCtx,
+ Task<? extends Serializable> currTask, boolean local) {
mapredWork plan = (mapredWork) currTask.getWork();
UnionOperator currUnionOp = opProcCtx.getCurrUnionOp();
assert currUnionOp != null;
GenMRUnionCtx uCtx = opProcCtx.getUnionTask(currUnionOp);
assert uCtx != null;
- List<String> taskTmpDirLst = uCtx.getTaskTmpDir();
- List<tableDesc> tt_descLst = uCtx.getTTDesc();
+ List<String> taskTmpDirLst = uCtx.getTaskTmpDir();
+ List<tableDesc> tt_descLst = uCtx.getTTDesc();
assert !taskTmpDirLst.isEmpty() && !tt_descLst.isEmpty();
assert taskTmpDirLst.size() == tt_descLst.size();
int size = taskTmpDirLst.size();
@@ -250,7 +279,8 @@
if (plan.getPathToAliases().get(taskTmpDir) == null) {
plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
- plan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
+ plan.getPathToPartitionInfo().put(taskTmpDir,
+ new partitionDesc(tt_desc, null));
plan.getAliasToWork().put(taskTmpDir, currUnionOp);
}
}
@@ -258,19 +288,22 @@
/**
* Merge the current task with the task for the current reducer
- * @param op operator being processed
- * @param oldTask the old task for the current reducer
- * @param task the current task for the current reducer
- * @param opProcCtx processing context
- * @param pos position of the parent in the stack
+ *
+ * @param op
+ * operator being processed
+ * @param oldTask
+ * the old task for the current reducer
+ * @param task
+ * the current task for the current reducer
+ * @param opProcCtx
+ * processing context
+ * @param pos
+ * position of the parent in the stack
*/
public static void joinPlan(Operator<? extends Serializable> op,
- Task<? extends Serializable> oldTask,
- Task<? extends Serializable> task,
- GenMRProcContext opProcCtx,
- int pos, boolean split,
- boolean readMapJoinData,
- boolean readUnionData) throws SemanticException {
+ Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
+ GenMRProcContext opProcCtx, int pos, boolean split,
+ boolean readMapJoinData, boolean readUnionData) throws SemanticException {
Task<? extends Serializable> currTask = task;
mapredWork plan = (mapredWork) currTask.getWork();
Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
@@ -279,72 +312,76 @@
// terminate the old task and make current task dependent on it
if (split) {
assert oldTask != null;
- splitTasks((ReduceSinkOperator)op, oldTask, currTask, opProcCtx, true, false, 0);
- }
- else {
- if ((oldTask != null) && (oldTask.getParentTasks() != null) && !oldTask.getParentTasks().isEmpty()) {
+ splitTasks(op, oldTask, currTask, opProcCtx, true, false, 0);
+ } else {
+ if ((oldTask != null) && (oldTask.getParentTasks() != null)
+ && !oldTask.getParentTasks().isEmpty()) {
parTasks = new ArrayList<Task<? extends Serializable>>();
parTasks.addAll(oldTask.getParentTasks());
Object[] parTaskArr = parTasks.toArray();
- for (int i = 0; i < parTaskArr.length; i++)
- ((Task<? extends Serializable>)parTaskArr[i]).removeDependentTask(oldTask);
+ for (Object element : parTaskArr) {
+ ((Task<? extends Serializable>) element).removeDependentTask(oldTask);
+ }
}
}
if (currTopOp != null) {
List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
- String currAliasId = opProcCtx.getCurrAliasId();
+ String currAliasId = opProcCtx.getCurrAliasId();
if (!seenOps.contains(currTopOp)) {
seenOps.add(currTopOp);
boolean local = false;
- if (pos != -1)
- local = (pos == ((mapJoinDesc)op.getConf()).getPosBigTable()) ? false : true;
+ if (pos != -1) {
+ local = (pos == ((mapJoinDesc) op.getConf()).getPosBigTable()) ? false
+ : true;
+ }
setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
}
currTopOp = null;
opProcCtx.setCurrTopOp(currTopOp);
- }
- else if (opProcCtx.getCurrMapJoinOp() != null) {
- MapJoinOperator mjOp = opProcCtx.getCurrMapJoinOp();
+ } else if (opProcCtx.getCurrMapJoinOp() != null) {
+ MapJoinOperator mjOp = opProcCtx.getCurrMapJoinOp();
if (readUnionData) {
initUnionPlan(opProcCtx, currTask, false);
- }
- else {
+ } else {
GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
- // In case of map-join followed by map-join, the file needs to be obtained from the old map join
+ // In case of map-join followed by map-join, the file needs to be
+ // obtained from the old map join
MapJoinOperator oldMapJoin = mjCtx.getOldMapJoin();
- String taskTmpDir = null;
- tableDesc tt_desc = null;
+ String taskTmpDir = null;
+ tableDesc tt_desc = null;
Operator<? extends Serializable> rootOp = null;
if (oldMapJoin == null) {
taskTmpDir = mjCtx.getTaskTmpDir();
- tt_desc = mjCtx.getTTDesc();
- rootOp = mjCtx.getRootMapJoinOp();
- }
- else {
+ tt_desc = mjCtx.getTTDesc();
+ rootOp = mjCtx.getRootMapJoinOp();
+ } else {
GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(oldMapJoin);
assert oldMjCtx != null;
taskTmpDir = oldMjCtx.getTaskTmpDir();
- tt_desc = oldMjCtx.getTTDesc();
- rootOp = oldMjCtx.getRootMapJoinOp();
+ tt_desc = oldMjCtx.getTTDesc();
+ rootOp = oldMjCtx.getRootMapJoinOp();
}
- boolean local = ((pos == -1) || (pos == ((mapJoinDesc)mjOp.getConf()).getPosBigTable())) ? false : true;
+ boolean local = ((pos == -1) || (pos == (mjOp.getConf())
+ .getPosBigTable())) ? false : true;
setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
}
opProcCtx.setCurrMapJoinOp(null);
if ((oldTask != null) && (parTasks != null)) {
- for (Task<? extends Serializable> parTask : parTasks)
+ for (Task<? extends Serializable> parTask : parTasks) {
parTask.addDependentTask(currTask);
+ }
}
- if (opProcCtx.getRootTasks().contains(currTask))
+ if (opProcCtx.getRootTasks().contains(currTask)) {
opProcCtx.getRootTasks().remove(currTask);
+ }
}
opProcCtx.setCurrTask(currTask);
@@ -352,26 +389,31 @@
/**
* Split the current plan by creating a temporary destination
- * @param op the reduce sink operator encountered
- * @param opProcCtx processing context
+ *
+ * @param op
+ * the reduce sink operator encountered
+ * @param opProcCtx
+ * processing context
*/
public static void splitPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
- throws SemanticException {
+ throws SemanticException {
// Generate a new task
mapredWork cplan = getMapRedWork();
ParseContext parseCtx = opProcCtx.getParseCtx();
- Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx.getConf());
+ Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
+ .getConf());
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
// Add the reducer
cplan.setReducer(reducer);
- reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+ reduceSinkDesc desc = op.getConf();
cplan.setNumReduceTasks(new Integer(desc.getNumReducers()));
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+ .getOpTaskMap();
opTaskMap.put(reducer, redTask);
- Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
+ Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
splitTasks(op, currTask, redTask, opProcCtx, true, false, 0);
opProcCtx.getRootOps().add(op);
@@ -379,30 +421,36 @@
/**
* set the current task in the mapredWork
- * @param alias_id current alias
- * @param topOp the top operator of the stack
- * @param plan current plan
- * @param local whether you need to add to map-reduce or local work
- * @param opProcCtx processing context
- */
- public static void setTaskPlan(String alias_id, Operator<? extends Serializable> topOp,
- mapredWork plan, boolean local, GenMRProcContext opProcCtx)
- throws SemanticException {
+ *
+ * @param alias_id
+ * current alias
+ * @param topOp
+ * the top operator of the stack
+ * @param plan
+ * current plan
+ * @param local
+ * whether you need to add to map-reduce or local work
+ * @param opProcCtx
+ * processing context
+ */
+ public static void setTaskPlan(String alias_id,
+ Operator<? extends Serializable> topOp, mapredWork plan, boolean local,
+ GenMRProcContext opProcCtx) throws SemanticException {
ParseContext parseCtx = opProcCtx.getParseCtx();
Set<ReadEntity> inputs = opProcCtx.getInputs();
ArrayList<Path> partDir = new ArrayList<Path>();
ArrayList<partitionDesc> partDesc = new ArrayList<partitionDesc>();
- Path tblDir = null;
- tableDesc tblDesc = null;
+ Path tblDir = null;
+ tableDesc tblDesc = null;
PrunedPartitionList partsList = null;
try {
partsList = PartitionPruner.prune(parseCtx.getTopToTable().get(topOp),
- parseCtx.getOpToPartPruner().get(topOp),
- opProcCtx.getConf(), alias_id, parseCtx.getPrunedPartitions());
+ parseCtx.getOpToPartPruner().get(topOp), opProcCtx.getConf(),
+ alias_id, parseCtx.getPrunedPartitions());
} catch (SemanticException e) {
throw e;
} catch (HiveException e) {
@@ -412,35 +460,40 @@
// Generate the map work for this alias_id
Set<Partition> parts = null;
- // pass both confirmed and unknown partitions through the map-reduce framework
+ // pass both confirmed and unknown partitions through the map-reduce
+ // framework
parts = partsList.getConfirmedPartns();
parts.addAll(partsList.getUnknownPartns());
partitionDesc aliasPartnDesc = null;
- try{
- if (parts.isEmpty()) {
- if (!partsList.getDeniedPartns().isEmpty())
- aliasPartnDesc = Utilities.getPartitionDesc(partsList.getDeniedPartns()
- .iterator().next());
- } else {
- aliasPartnDesc = Utilities.getPartitionDesc(parts.iterator().next());
- }
+ try {
+ if (parts.isEmpty()) {
+ if (!partsList.getDeniedPartns().isEmpty()) {
+ aliasPartnDesc = Utilities.getPartitionDesc(partsList
+ .getDeniedPartns().iterator().next());
+ }
+ } else {
+ aliasPartnDesc = Utilities.getPartitionDesc(parts.iterator().next());
+ }
} catch (HiveException e) {
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
throw new SemanticException(e.getMessage(), e);
}
// The table does not have any partitions
- if (aliasPartnDesc == null)
- aliasPartnDesc = new partitionDesc(Utilities.getTableDesc(parseCtx.getTopToTable().get(topOp)), null);
+ if (aliasPartnDesc == null) {
+ aliasPartnDesc = new partitionDesc(Utilities.getTableDesc(parseCtx
+ .getTopToTable().get(topOp)), null);
+ }
plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
for (Partition part : parts) {
- if (part.getTable().isPartitioned())
+ if (part.getTable().isPartitioned()) {
inputs.add(new ReadEntity(part));
- else
+ } else {
inputs.add(new ReadEntity(part.getTable()));
+ }
// Later the properties have to come from the partition as opposed
// to from the table in order to support versioning.
@@ -449,8 +502,7 @@
if (sampleDescr != null) {
paths = SamplePruner.prune(part, sampleDescr);
- }
- else {
+ } else {
paths = part.getPath();
}
@@ -462,23 +514,24 @@
tblDesc = Utilities.getTableDesc(part.getTable());
}
- for (Path p: paths) {
- if(p == null)
+ for (Path p : paths) {
+ if (p == null) {
continue;
+ }
String path = p.toString();
LOG.debug("Adding " + path + " of table" + alias_id);
partDir.add(p);
- try{
- partDesc.add(Utilities.getPartitionDesc(part));
+ try {
+ partDesc.add(Utilities.getPartitionDesc(part));
} catch (HiveException e) {
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
throw new SemanticException(e.getMessage(), e);
}
}
}
- Iterator<Path> iterPath = partDir.iterator();
+ Iterator<Path> iterPath = partDir.iterator();
Iterator<partitionDesc> iterPartnDesc = partDesc.iterator();
if (!local) {
@@ -499,53 +552,65 @@
assert plan.getAliasToWork().get(alias_id) == null;
plan.getAliasToWork().put(alias_id, topOp);
- }
- else {
+ } else {
// populate local work if needed
mapredLocalWork localPlan = plan.getMapLocalWork();
- if (localPlan == null)
+ if (localPlan == null) {
localPlan = new mapredLocalWork(
new LinkedHashMap<String, Operator<? extends Serializable>>(),
new LinkedHashMap<String, fetchWork>());
+ }
assert localPlan.getAliasToWork().get(alias_id) == null;
assert localPlan.getAliasToFetchWork().get(alias_id) == null;
localPlan.getAliasToWork().put(alias_id, topOp);
- if (tblDir == null)
- localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(fetchWork.convertPathToStringArray(partDir), partDesc));
- else
- localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(tblDir.toString(), tblDesc));
+ if (tblDir == null) {
+ localPlan.getAliasToFetchWork()
+ .put(
+ alias_id,
+ new fetchWork(fetchWork.convertPathToStringArray(partDir),
+ partDesc));
+ } else {
+ localPlan.getAliasToFetchWork().put(alias_id,
+ new fetchWork(tblDir.toString(), tblDesc));
+ }
plan.setMapLocalWork(localPlan);
}
}
-
/**
* set the current task in the mapredWork
- * @param alias current alias
- * @param topOp the top operator of the stack
- * @param plan current plan
- * @param local whether you need to add to map-reduce or local work
- * @param tt_desc table descriptor
- */
- public static void setTaskPlan(String path, String alias, Operator<? extends Serializable> topOp,
- mapredWork plan, boolean local, tableDesc tt_desc)
- throws SemanticException {
+ *
+ * @param alias
+ * current alias
+ * @param topOp
+ * the top operator of the stack
+ * @param plan
+ * current plan
+ * @param local
+ * whether you need to add to map-reduce or local work
+ * @param tt_desc
+ * table descriptor
+ */
+ public static void setTaskPlan(String path, String alias,
+ Operator<? extends Serializable> topOp, mapredWork plan, boolean local,
+ tableDesc tt_desc) throws SemanticException {
if (!local) {
- if (plan.getPathToAliases().get(path) == null)
+ if (plan.getPathToAliases().get(path) == null) {
plan.getPathToAliases().put(path, new ArrayList<String>());
+ }
plan.getPathToAliases().get(path).add(alias);
plan.getPathToPartitionInfo().put(path, new partitionDesc(tt_desc, null));
plan.getAliasToWork().put(alias, topOp);
- }
- else {
+ } else {
// populate local work if needed
mapredLocalWork localPlan = plan.getMapLocalWork();
- if (localPlan == null)
+ if (localPlan == null) {
localPlan = new mapredLocalWork(
- new LinkedHashMap<String, Operator<? extends Serializable>>(),
- new LinkedHashMap<String, fetchWork>());
+ new LinkedHashMap<String, Operator<? extends Serializable>>(),
+ new LinkedHashMap<String, fetchWork>());
+ }
assert localPlan.getAliasToWork().get(alias) == null;
assert localPlan.getAliasToFetchWork().get(alias) == null;
@@ -557,15 +622,20 @@
/**
* set key and value descriptor
- * @param plan current plan
- * @param topOp current top operator in the path
- */
- public static void setKeyAndValueDesc(mapredWork plan, Operator<? extends Serializable> topOp) {
- if (topOp == null)
+ *
+ * @param plan
+ * current plan
+ * @param topOp
+ * current top operator in the path
+ */
+ public static void setKeyAndValueDesc(mapredWork plan,
+ Operator<? extends Serializable> topOp) {
+ if (topOp == null) {
return;
+ }
if (topOp instanceof ReduceSinkOperator) {
- ReduceSinkOperator rs = (ReduceSinkOperator)topOp;
+ ReduceSinkOperator rs = (ReduceSinkOperator) topOp;
plan.setKeyDesc(rs.getConf().getKeySerializeInfo());
int tag = Math.max(0, rs.getConf().getTag());
List<tableDesc> tagToSchema = plan.getTagToValueDesc();
@@ -574,9 +644,10 @@
}
tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
} else {
- List<Operator<? extends Serializable>> children = topOp.getChildOperators();
+ List<Operator<? extends Serializable>> children = topOp
+ .getChildOperators();
if (children != null) {
- for(Operator<? extends Serializable> op: children) {
+ for (Operator<? extends Serializable> op : children) {
setKeyAndValueDesc(plan, op);
}
}
@@ -585,13 +656,15 @@
/**
* create a new plan and return
+ *
* @return the new plan
*/
public static mapredWork getMapRedWork() {
mapredWork work = new mapredWork();
work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
work.setPathToPartitionInfo(new LinkedHashMap<String, partitionDesc>());
- work.setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
+ work
+ .setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
work.setTagToValueDesc(new ArrayList<tableDesc>());
work.setReducer(null);
return work;
@@ -599,13 +672,17 @@
/**
* insert in the map for the operator to row resolver
- * @param op operator created
- * @param rr row resolver
- * @param parseCtx parse context
+ *
+ * @param op
+ * operator created
+ * @param rr
+ * row resolver
+ * @param parseCtx
+ * parse context
*/
@SuppressWarnings("nls")
- private static Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr, ParseContext parseCtx)
- {
+ private static Operator<? extends Serializable> putOpInsertMap(
+ Operator<? extends Serializable> op, RowResolver rr, ParseContext parseCtx) {
OpParseContext ctx = new OpParseContext(rr);
parseCtx.getOpParseCtx().put(op, ctx);
return op;
@@ -622,40 +699,47 @@
* @param pos position of the parent
**/
public static void splitTasks(Operator<? extends Serializable> op,
- Task<? extends Serializable> parentTask,
- Task<? extends Serializable> childTask,
- GenMRProcContext opProcCtx, boolean setReducer,
- boolean local, int posn) throws SemanticException {
- mapredWork plan = (mapredWork) childTask.getWork();
+ Task<? extends Serializable> parentTask,
+ Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
+ boolean setReducer, boolean local, int posn) throws SemanticException {
+ childTask.getWork();
Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
ParseContext parseCtx = opProcCtx.getParseCtx();
parentTask.addDependentTask(childTask);
- // Root Task cannot depend on any other task, therefore childTask cannot be a root Task
+ // Root Task cannot depend on any other task, therefore childTask cannot be
+ // a root Task
List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
- if (rootTasks.contains(childTask))
+ if (rootTasks.contains(childTask)) {
rootTasks.remove(childTask);
+ }
// generate the temporary file
Context baseCtx = parseCtx.getContext();
String taskTmpDir = baseCtx.getMRTmpFileURI();
Operator<? extends Serializable> parent = op.getParentOperators().get(posn);
- tableDesc tt_desc =
- PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
+ tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+ .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
// Create a file sink operator for this file name
- boolean compressIntermediate = parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE);
- fileSinkDesc desc = new fileSinkDesc(taskTmpDir, tt_desc, compressIntermediate);
+ boolean compressIntermediate = parseCtx.getConf().getBoolVar(
+ HiveConf.ConfVars.COMPRESSINTERMEDIATE);
+ fileSinkDesc desc = new fileSinkDesc(taskTmpDir, tt_desc,
+ compressIntermediate);
if (compressIntermediate) {
- desc.setCompressCodec(parseCtx.getConf().getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
- desc.setCompressType(parseCtx.getConf().getVar(HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
+ desc.setCompressCodec(parseCtx.getConf().getVar(
+ HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
+ desc.setCompressType(parseCtx.getConf().getVar(
+ HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
}
- Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory.get(desc, parent.getSchema()), null, parseCtx);
+ Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory
+ .get(desc, parent.getSchema()), null, parseCtx);
// replace the reduce child with this operator
- List<Operator<? extends Serializable>> childOpList = parent.getChildOperators();
+ List<Operator<? extends Serializable>> childOpList = parent
+ .getChildOperators();
for (int pos = 0; pos < childOpList.size(); pos++) {
if (childOpList.get(pos) == op) {
childOpList.set(pos, fs_op);
@@ -668,15 +752,16 @@
fs_op.setParentOperators(parentOpList);
// create a dummy tableScan operator on top of op
- Operator<? extends Serializable> ts_op =
- putOpInsertMap(OperatorFactory.get(tableScanDesc.class, parent.getSchema()), null, parseCtx);
+ Operator<? extends Serializable> ts_op = putOpInsertMap(OperatorFactory
+ .get(tableScanDesc.class, parent.getSchema()), null, parseCtx);
childOpList = new ArrayList<Operator<? extends Serializable>>();
childOpList.add(op);
ts_op.setChildOperators(childOpList);
op.getParentOperators().set(posn, ts_op);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx
+ .getMapCurrCtx();
mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
String streamDesc = taskTmpDir;
@@ -690,14 +775,16 @@
streamDesc = "$INTNAME";
origStreamDesc = streamDesc;
int pos = 0;
- while (cplan.getAliasToWork().get(streamDesc) != null)
+ while (cplan.getAliasToWork().get(streamDesc) != null) {
streamDesc = origStreamDesc.concat(String.valueOf(++pos));
+ }
}
// TODO: Allocate work to remove the temporary files and make that
// dependent on the redTask
- if (reducer.getClass() == JoinOperator.class)
+ if (reducer.getClass() == JoinOperator.class) {
cplan.setNeedsTagging(true);
+ }
}
// Add the path to alias mapping
@@ -705,18 +792,19 @@
// This can be cleaned up as a function table in future
if (op instanceof MapJoinOperator) {
- MapJoinOperator mjOp = (MapJoinOperator)op;
+ MapJoinOperator mjOp = (MapJoinOperator) op;
opProcCtx.setCurrMapJoinOp(mjOp);
GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
- if (mjCtx == null)
+ if (mjCtx == null) {
mjCtx = new GenMRMapJoinCtx(taskTmpDir, tt_desc, ts_op, null);
- else {
+ } else {
mjCtx.setTaskTmpDir(taskTmpDir);
mjCtx.setTTDesc(tt_desc);
mjCtx.setRootMapJoinOp(ts_op);
}
opProcCtx.setMapJoinCtx(mjOp, mjCtx);
- opProcCtx.getMapCurrCtx().put(parent, new GenMapRedCtx(childTask, null, null));
+ opProcCtx.getMapCurrCtx().put(parent,
+ new GenMapRedCtx(childTask, null, null));
}
currTopOp = null;
@@ -727,7 +815,8 @@
opProcCtx.setCurrTask(childTask);
}
- static public void mergeMapJoinUnion(UnionOperator union, GenMRProcContext ctx, int pos) throws SemanticException {
+ static public void mergeMapJoinUnion(UnionOperator union,
+ GenMRProcContext ctx, int pos) throws SemanticException {
ParseContext parseCtx = ctx.getParseCtx();
UnionProcContext uCtx = parseCtx.getUCtx();
@@ -739,7 +828,7 @@
GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
Task<? extends Serializable> uTask = null;
- Operator<? extends Serializable> parent = union.getParentOperators().get(pos);
+ union.getParentOperators().get(pos);
mapredWork uPlan = null;
// union is encountered for the first time
@@ -749,10 +838,9 @@
uTask = TaskFactory.get(uPlan, parseCtx.getConf());
uCtxTask.setUTask(uTask);
ctx.setUnionTask(union, uCtxTask);
- }
- else {
+ } else {
uTask = uCtxTask.getUTask();
- uPlan = (mapredWork)uTask.getWork();
+ uPlan = (mapredWork) uTask.getWork();
}
// If there is a mapjoin at position 'pos'
@@ -762,30 +850,34 @@
if (uPlan.getPathToAliases().get(taskTmpDir) == null) {
uPlan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
uPlan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
- uPlan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(mjCtx.getTTDesc(), null));
+ uPlan.getPathToPartitionInfo().put(taskTmpDir,
+ new partitionDesc(mjCtx.getTTDesc(), null));
uPlan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
}
- for (Task t : currTask.getParentTasks())
+ for (Task t : currTask.getParentTasks()) {
t.addDependentTask(uTask);
+ }
try {
boolean notDone = true;
while (notDone) {
- for (Task t : currTask.getParentTasks())
+ for (Task t : currTask.getParentTasks()) {
t.removeDependentTask(currTask);
+ }
notDone = false;
}
} catch (java.util.ConcurrentModificationException e) {
}
- }
- else
+ } else {
setTaskPlan(ctx.getCurrAliasId(), ctx.getCurrTopOp(), uPlan, false, ctx);
+ }
ctx.setCurrTask(uTask);
ctx.setCurrAliasId(null);
ctx.setCurrTopOp(null);
ctx.setCurrMapJoinOp(null);
- ctx.getMapCurrCtx().put((Operator<? extends Serializable>)union, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+ ctx.getMapCurrCtx().put(union,
+ new GenMapRedCtx(ctx.getCurrTask(), null, null));
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Thu Jan 21 10:37:58 2010
@@ -130,13 +130,14 @@
throws SemanticException {
// if this is not a HASH groupby, return
- if (curr.getConf().getMode() != groupByDesc.Mode.HASH)
+ if (curr.getConf().getMode() != groupByDesc.Mode.HASH) {
return;
+ }
- Set<String> tblNames = this.pGraphContext.getGroupOpToInputTables().get(
- curr);
- if (tblNames == null || tblNames.size() == 0)
+ Set<String> tblNames = pGraphContext.getGroupOpToInputTables().get(curr);
+ if (tblNames == null || tblNames.size() == 0) {
return;
+ }
boolean bucketGroupBy = true;
groupByDesc desc = curr.getConf();
@@ -144,7 +145,7 @@
groupByKeys.addAll(desc.getKeys());
// compute groupby columns from groupby keys
List<String> groupByCols = new ArrayList<String>();
- while (groupByKeys.size() >0) {
+ while (groupByKeys.size() > 0) {
exprNodeDesc node = groupByKeys.remove(0);
if (node instanceof exprNodeColumnDesc) {
groupByCols.addAll(node.getCols());
@@ -155,22 +156,24 @@
groupByKeys.add(0, ((exprNodeFieldDesc) node).getDesc());
continue;
} else if (node instanceof exprNodeGenericFuncDesc) {
- exprNodeGenericFuncDesc udfNode = ((exprNodeGenericFuncDesc)node);
+ exprNodeGenericFuncDesc udfNode = ((exprNodeGenericFuncDesc) node);
GenericUDF udf = udfNode.getGenericUDF();
- if(!FunctionRegistry.isDeterministic(udf))
+ if (!FunctionRegistry.isDeterministic(udf)) {
return;
+ }
groupByKeys.addAll(0, udfNode.getChildExprs());
} else {
return;
}
}
-
- if(groupByCols.size() == 0)
+
+ if (groupByCols.size() == 0) {
return;
+ }
for (String table : tblNames) {
- Operator<? extends Serializable> topOp = this.pGraphContext.getTopOps()
- .get(table);
+ Operator<? extends Serializable> topOp = pGraphContext.getTopOps().get(
+ table);
if (topOp == null || (!(topOp instanceof TableScanOperator))) {
// this is in a sub-query.
// In future, we need to infer subq's columns propery. For example
@@ -180,21 +183,25 @@
return;
}
TableScanOperator ts = (TableScanOperator) topOp;
- Table destTable = this.pGraphContext.getTopToTable().get(ts);
- if (destTable == null)
+ Table destTable = pGraphContext.getTopToTable().get(ts);
+ if (destTable == null) {
return;
+ }
if (!destTable.isPartitioned()) {
List<String> bucketCols = destTable.getBucketCols();
- List<String> sortCols = Utilities.getColumnNamesFromSortCols(destTable.getSortCols());
- bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols, sortCols);
- if (!bucketGroupBy)
+ List<String> sortCols = Utilities
+ .getColumnNamesFromSortCols(destTable.getSortCols());
+ bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols,
+ sortCols);
+ if (!bucketGroupBy) {
return;
+ }
} else {
PrunedPartitionList partsList = null;
try {
- partsList = PartitionPruner.prune(destTable, this.pGraphContext
- .getOpToPartPruner().get(ts), this.pGraphContext.getConf(),
- table, this.pGraphContext.getPrunedPartitions());
+ partsList = PartitionPruner.prune(destTable, pGraphContext
+ .getOpToPartPruner().get(ts), pGraphContext.getConf(), table,
+ pGraphContext.getPrunedPartitions());
} catch (HiveException e) {
// Has to use full name to make sure it does not conflict with
// org.apache.commons.lang.StringUtils
@@ -206,10 +213,13 @@
parts.addAll(partsList.getUnknownPartns());
for (Partition part : parts) {
List<String> bucketCols = part.getBucketCols();
- List<String> sortCols = Utilities.getColumnNamesFromSortCols(part.getTPartition().getSd().getSortCols());
- bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols, sortCols);
- if (!bucketGroupBy)
+ List<String> sortCols = Utilities.getColumnNamesFromSortCols(part
+ .getTPartition().getSd().getSortCols());
+ bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols,
+ sortCols);
+ if (!bucketGroupBy) {
return;
+ }
}
}
}
@@ -235,26 +245,29 @@
* @throws SemanticException
*/
private boolean matchBucketOrSortedColumns(List<String> groupByCols,
- List<String> bucketCols, List<String> sortCols) throws SemanticException {
+ List<String> bucketCols, List<String> sortCols)
+ throws SemanticException {
boolean ret = false;
-
+
if (sortCols == null || sortCols.size() == 0) {
ret = matchBucketColumns(groupByCols, bucketCols);
}
-
+
if (!ret && sortCols != null && sortCols.size() >= groupByCols.size()) {
// check sort columns, if groupByCols is a prefix subset of sort
// columns, we will use sorted group by. For example, if data is sorted
// by column a, b, c, and a query wants to group by b,a, we will use
- // sorted group by. But if the query wants to groupby b,c, then sorted group by can not be used.
+ // sorted group by. But if the query wants to groupby b,c, then sorted
+ // group by can not be used.
int num = groupByCols.size();
- for(int i =0;i<num; i++){
- if(sortCols.indexOf(groupByCols.get(i)) > (num -1))
+ for (int i = 0; i < num; i++) {
+ if (sortCols.indexOf(groupByCols.get(i)) > (num - 1)) {
return false;
+ }
}
return true;
}
-
+
return ret;
}
@@ -267,13 +280,15 @@
List<String> tblBucketCols) throws SemanticException {
if (tblBucketCols == null || tblBucketCols.size() == 0
- || grpCols.size() == 0 || grpCols.size() != tblBucketCols.size())
+ || grpCols.size() == 0 || grpCols.size() != tblBucketCols.size()) {
return false;
+ }
for (int i = 0; i < grpCols.size(); i++) {
String tblCol = grpCols.get(i);
- if (!tblBucketCols.contains(tblCol))
+ if (!tblBucketCols.contains(tblCol)) {
return false;
+ }
}
return true;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java Thu Jan 21 10:37:58 2010
@@ -19,15 +19,10 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -38,29 +33,29 @@
/**
* Implementation of rule-based join table reordering optimization. User passes
* hints to specify which tables are to be streamed and they are moved to have
- * largest tag so that they are processed last.
- * In future, once statistics are implemented, this transformation can also be
- * done based on costs.
+ * largest tag so that they are processed last. In future, once statistics are
+ * implemented, this transformation can also be done based on costs.
*/
public class JoinReorder implements Transform {
/**
* Estimate the size of the output based on the STREAMTABLE hints. To do so
- * the whole tree is traversed. Possible sizes:
- * 0: the operator and its subtree don't contain any big tables
- * 1: the subtree of the operator contains a big table
- * 2: the operator is a big table
- *
- * @param operator The operator which output size is to be estimated
- * @param bigTables Set of tables that should be streamed
+ * the whole tree is traversed. Possible sizes: 0: the operator and its
+ * subtree don't contain any big tables 1: the subtree of the operator
+ * contains a big table 2: the operator is a big table
+ *
+ * @param operator
+ * The operator which output size is to be estimated
+ * @param bigTables
+ * Set of tables that should be streamed
* @return The estimated size - 0 (no streamed tables), 1 (streamed tables in
- * subtree) or 2 (a streamed table)
+ * subtree) or 2 (a streamed table)
*/
private int getOutputSize(Operator<? extends Serializable> operator,
- Set<String> bigTables) {
+ Set<String> bigTables) {
// If a join operator contains a big subtree, there is a chance that its
// output is also big, so the output size is 1 (medium)
if (operator instanceof JoinOperator) {
- for(Operator<? extends Serializable> o: operator.getParentOperators()) {
+ for (Operator<? extends Serializable> o : operator.getParentOperators()) {
if (getOutputSize(o, bigTables) != 0) {
return 1;
}
@@ -69,7 +64,7 @@
// If a table is in bigTables then its output is big (2)
if (operator instanceof TableScanOperator) {
- String alias = ((TableScanOperator)operator).getConf().getAlias();
+ String alias = ((TableScanOperator) operator).getConf().getAlias();
if (bigTables.contains(alias)) {
return 2;
}
@@ -79,7 +74,7 @@
// the biggest output from a parent
int maxSize = 0;
if (operator.getParentOperators() != null) {
- for(Operator<? extends Serializable> o: operator.getParentOperators()) {
+ for (Operator<? extends Serializable> o : operator.getParentOperators()) {
int current = getOutputSize(o, bigTables);
if (current > maxSize) {
maxSize = current;
@@ -92,14 +87,15 @@
/**
* Find all big tables from STREAMTABLE hints
- *
- * @param joinCtx The join context
+ *
+ * @param joinCtx
+ * The join context
* @return Set of all big tables
*/
private Set<String> getBigTables(ParseContext joinCtx) {
Set<String> bigTables = new HashSet<String>();
- for (QBJoinTree qbJoin: joinCtx.getJoinContext().values()) {
+ for (QBJoinTree qbJoin : joinCtx.getJoinContext().values()) {
if (qbJoin.getStreamAliases() != null) {
bigTables.addAll(qbJoin.getStreamAliases());
}
@@ -111,20 +107,22 @@
/**
* Reorder the tables in a join operator appropriately (by reordering the tags
* of the reduces sinks)
- *
- * @param joinOp The join operator to be processed
- * @param bigTables Set of all big tables
+ *
+ * @param joinOp
+ * The join operator to be processed
+ * @param bigTables
+ * Set of all big tables
*/
private void reorder(JoinOperator joinOp, Set<String> bigTables) {
int count = joinOp.getParentOperators().size();
// Find the biggest reduce sink
- int biggestPos = count - 1;
- int biggestSize = getOutputSize(joinOp.getParentOperators().get(biggestPos),
- bigTables);
+ int biggestPos = count - 1;
+ int biggestSize = getOutputSize(
+ joinOp.getParentOperators().get(biggestPos), bigTables);
for (int i = 0; i < count - 1; i++) {
int currSize = getOutputSize(joinOp.getParentOperators().get(i),
- bigTables);
+ bigTables);
if (currSize > biggestSize) {
biggestSize = currSize;
biggestPos = i;
@@ -135,14 +133,14 @@
if (biggestPos != (count - 1)) {
Byte[] tagOrder = joinOp.getConf().getTagOrder();
Byte temp = tagOrder[biggestPos];
- tagOrder[biggestPos] = tagOrder[count-1];
- tagOrder[count-1] = temp;
+ tagOrder[biggestPos] = tagOrder[count - 1];
+ tagOrder[count - 1] = temp;
// Update tags of reduce sinks
- ((ReduceSinkOperator)joinOp.getParentOperators().get(biggestPos))
- .getConf().setTag(count-1);
- ((ReduceSinkOperator)joinOp.getParentOperators().get(count-1)).getConf()
- .setTag(biggestPos);
+ ((ReduceSinkOperator) joinOp.getParentOperators().get(biggestPos))
+ .getConf().setTag(count - 1);
+ ((ReduceSinkOperator) joinOp.getParentOperators().get(count - 1))
+ .getConf().setTag(biggestPos);
}
}
@@ -150,13 +148,14 @@
* Transform the query tree. For each join, check which reduce sink will
* output the biggest result (based on STREAMTABLE hints) and give it the
* biggest tag so that it gets streamed.
- *
- * @param pactx current parse context
+ *
+ * @param pactx
+ * current parse context
*/
public ParseContext transform(ParseContext pactx) throws SemanticException {
Set<String> bigTables = getBigTables(pactx);
- for (JoinOperator joinOp: pactx.getJoinContext().keySet()) {
+ for (JoinOperator joinOp : pactx.getJoinContext().keySet()) {
reorder(joinOp, bigTables);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Thu Jan 21 10:37:58 2010
@@ -18,36 +18,34 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.io.Serializable;
-import java.util.List;
import java.util.ArrayList;
-import java.util.Stack;
-import java.util.Map;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
-import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.parse.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
/**
* Operator factory for MapJoin processing
@@ -58,13 +56,14 @@
int pos = 0;
int size = stack.size();
assert size >= 2 && stack.get(size - 1) == op;
- Operator<? extends Serializable> parent = (Operator<? extends Serializable>)stack.get(size - 2);
+ Operator<? extends Serializable> parent = (Operator<? extends Serializable>) stack
+ .get(size - 2);
List<Operator<? extends Serializable>> parOp = op.getParentOperators();
pos = parOp.indexOf(parent);
- assert pos < parOp.size();
+ assert pos < parOp.size();
return pos;
}
-
+
/**
* TableScan followed by MapJoin
*/
@@ -73,43 +72,49 @@
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- MapJoinOperator mapJoin = (MapJoinOperator)nd;
- GenMRProcContext ctx = (GenMRProcContext)procCtx;
+ MapJoinOperator mapJoin = (MapJoinOperator) nd;
+ GenMRProcContext ctx = (GenMRProcContext) procCtx;
// find the branch on which this processor was invoked
int pos = getPositionParent(mapJoin, stack);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
- GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
+ pos));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork currPlan = (mapredWork) currTask.getWork();
- Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+ Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
String currAliasId = mapredCtx.getCurrAliasId();
Operator<? extends Serializable> reducer = mapJoin;
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-
+
ctx.setCurrTopOp(currTopOp);
ctx.setCurrAliasId(currAliasId);
ctx.setCurrTask(currTask);
-
+
// If the plan for this reducer does not exist, initialize the plan
if (opMapTask == null) {
assert currPlan.getReducer() == null;
GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, false, false, false, pos);
}
- // The current plan can be thrown away after being merged with the original plan
+ // The current plan can be thrown away after being merged with the
+ // original plan
else {
- GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false, false, false);
+ GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false,
+ false, false);
currTask = opMapTask;
ctx.setCurrTask(currTask);
}
-
- mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+
+ mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx
+ .getCurrTopOp(), ctx.getCurrAliasId()));
return null;
}
}
-
+
/**
* ReduceSink followed by MapJoin
*/
@@ -118,37 +123,43 @@
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- MapJoinOperator mapJoin = (MapJoinOperator)nd;
- GenMRProcContext opProcCtx = (GenMRProcContext)procCtx;
-
+ MapJoinOperator mapJoin = (MapJoinOperator) nd;
+ GenMRProcContext opProcCtx = (GenMRProcContext) procCtx;
+
mapredWork cplan = GenMapRedUtils.getMapRedWork();
ParseContext parseCtx = opProcCtx.getParseCtx();
- Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx.getConf());
+ Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
+ .getConf());
Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
// find the branch on which this processor was invoked
int pos = getPositionParent(mapJoin, stack);
- boolean local = (pos == ((mapJoinDesc)mapJoin.getConf()).getPosBigTable()) ? false : true;
-
- GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false, local, pos);
+ boolean local = (pos == (mapJoin.getConf()).getPosBigTable()) ? false
+ : true;
+
+ GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false,
+ local, pos);
currTask = opProcCtx.getCurrTask();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
+ .getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
-
+
// If the plan for this reducer does not exist, initialize the plan
if (opMapTask == null) {
assert cplan.getReducer() == null;
opTaskMap.put(mapJoin, currTask);
opProcCtx.setCurrMapJoinOp(null);
}
- // The current plan can be thrown away after being merged with the original plan
+ // The current plan can be thrown away after being merged with the
+ // original plan
else {
- GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, opProcCtx, pos, false, false, false);
+ GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, opProcCtx, pos,
+ false, false, false);
currTask = opMapTask;
opProcCtx.setCurrTask(currTask);
}
-
+
return null;
}
}
@@ -159,87 +170,93 @@
public static class MapJoin implements NodeProcessor {
/**
- * Create a task by splitting the plan below the join. The reason, we have to do so in the
- * processing of Select and not MapJoin is due to the walker. While processing a node, it is not safe
- * to alter its children because that will decide the course of the walk. It is perfectly fine to muck around
- * with its parents though, since those nodes have already been visited.
+ * Create a task by splitting the plan below the join. The reason, we have
+ * to do so in the processing of Select and not MapJoin is due to the
+ * walker. While processing a node, it is not safe to alter its children
+ * because that will decide the course of the walk. It is perfectly fine to
+ * muck around with its parents though, since those nodes have already been
+ * visited.
*/
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
-
- SelectOperator sel = (SelectOperator)nd;
- MapJoinOperator mapJoin = (MapJoinOperator)sel.getParentOperators().get(0);
+
+ SelectOperator sel = (SelectOperator) nd;
+ MapJoinOperator mapJoin = (MapJoinOperator) sel.getParentOperators().get(
+ 0);
assert sel.getParentOperators().size() == 1;
-
- GenMRProcContext ctx = (GenMRProcContext)procCtx;
+
+ GenMRProcContext ctx = (GenMRProcContext) procCtx;
ParseContext parseCtx = ctx.getParseCtx();
-
+
// is the mapjoin followed by a reducer
- List<MapJoinOperator> listMapJoinOps = parseCtx.getListMapJoinOpsNoReducer();
-
+ List<MapJoinOperator> listMapJoinOps = parseCtx
+ .getListMapJoinOpsNoReducer();
+
if (listMapJoinOps.contains(mapJoin)) {
ctx.setCurrAliasId(null);
ctx.setCurrTopOp(null);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
- mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
+ mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+ ctx.getCurrTask(), null, null));
return null;
}
ctx.setCurrMapJoinOp(mapJoin);
-
+
Task<? extends Serializable> currTask = ctx.getCurrTask();
GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
if (mjCtx == null) {
mjCtx = new GenMRMapJoinCtx();
ctx.setMapJoinCtx(mapJoin, mjCtx);
}
-
+
mapredWork mjPlan = GenMapRedUtils.getMapRedWork();
- Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx.getConf());
-
- tableDesc tt_desc =
- PlanUtils.getIntermediateFileTableDesc(
- PlanUtils.getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol"));
-
+ Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx
+ .getConf());
+
+ tableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+ .getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol"));
+
// generate the temporary file
Context baseCtx = parseCtx.getContext();
String taskTmpDir = baseCtx.getMRTmpFileURI();
-
+
// Add the path to alias mapping
mjCtx.setTaskTmpDir(taskTmpDir);
mjCtx.setTTDesc(tt_desc);
mjCtx.setRootMapJoinOp(sel);
-
+
sel.setParentOperators(null);
-
+
// Create a file sink operator for this file name
- Operator<? extends Serializable> fs_op =
- OperatorFactory.get
- (new fileSinkDesc(taskTmpDir, tt_desc,
- parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE)),
- mapJoin.getSchema());
-
+ Operator<? extends Serializable> fs_op = OperatorFactory.get(
+ new fileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
+ HiveConf.ConfVars.COMPRESSINTERMEDIATE)), mapJoin.getSchema());
+
assert mapJoin.getChildOperators().size() == 1;
mapJoin.getChildOperators().set(0, fs_op);
-
+
List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
parentOpList.add(mapJoin);
fs_op.setParentOperators(parentOpList);
-
+
currTask.addDependentTask(mjTask);
-
+
ctx.setCurrTask(mjTask);
ctx.setCurrAliasId(null);
ctx.setCurrTopOp(null);
-
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
- mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), null, null));
-
+
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
+ mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+ ctx.getCurrTask(), null, null));
+
return null;
}
}
-
+
/**
* MapJoin followed by MapJoin
*/
@@ -248,50 +265,57 @@
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- MapJoinOperator mapJoin = (MapJoinOperator)nd;
- GenMRProcContext ctx = (GenMRProcContext)procCtx;
+ MapJoinOperator mapJoin = (MapJoinOperator) nd;
+ GenMRProcContext ctx = (GenMRProcContext) procCtx;
- ParseContext parseCtx = ctx.getParseCtx();
+ ctx.getParseCtx();
MapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp();
assert oldMapJoin != null;
GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
- if (mjCtx != null)
+ if (mjCtx != null) {
mjCtx.setOldMapJoin(oldMapJoin);
- else
- ctx.setMapJoinCtx(mapJoin, new GenMRMapJoinCtx(null, null, null, oldMapJoin));
+ } else {
+ ctx.setMapJoinCtx(mapJoin, new GenMRMapJoinCtx(null, null, null,
+ oldMapJoin));
+ }
ctx.setCurrMapJoinOp(mapJoin);
// find the branch on which this processor was invoked
int pos = getPositionParent(mapJoin, stack);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
- GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
+ pos));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork currPlan = (mapredWork) currTask.getWork();
- String currAliasId = mapredCtx.getCurrAliasId();
+ mapredCtx.getCurrAliasId();
Operator<? extends Serializable> reducer = mapJoin;
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-
+
ctx.setCurrTask(currTask);
-
+
// If the plan for this reducer does not exist, initialize the plan
if (opMapTask == null) {
assert currPlan.getReducer() == null;
GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, false, false, pos);
}
- // The current plan can be thrown away after being merged with the original plan
+ // The current plan can be thrown away after being merged with the
+ // original plan
else {
- GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, ctx, pos, false, true, false);
+ GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, ctx, pos, false,
+ true, false);
currTask = opMapTask;
ctx.setCurrTask(currTask);
}
-
+
mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), null, null));
return null;
}
}
-
+
/**
* Union followed by MapJoin
*/
@@ -300,36 +324,43 @@
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- GenMRProcContext ctx = (GenMRProcContext)procCtx;
+ GenMRProcContext ctx = (GenMRProcContext) procCtx;
ParseContext parseCtx = ctx.getParseCtx();
UnionProcContext uCtx = parseCtx.getUCtx();
// union was map only - no special processing needed
- if (uCtx.isMapOnlySubq())
- return (new TableScanMapJoin()).process(nd, stack, procCtx, nodeOutputs);
-
+ if (uCtx.isMapOnlySubq()) {
+ return (new TableScanMapJoin())
+ .process(nd, stack, procCtx, nodeOutputs);
+ }
+
UnionOperator currUnion = ctx.getCurrUnionOp();
assert currUnion != null;
- GenMRUnionCtx unionCtx = ctx.getUnionTask(currUnion);
- MapJoinOperator mapJoin = (MapJoinOperator)nd;
+ ctx.getUnionTask(currUnion);
+ MapJoinOperator mapJoin = (MapJoinOperator) nd;
// find the branch on which this processor was invoked
int pos = getPositionParent(mapJoin, stack);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
- GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ .getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
+ pos));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork currPlan = (mapredWork) currTask.getWork();
Operator<? extends Serializable> reducer = mapJoin;
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ .getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-
+
// union result cannot be a map table
- boolean local = (pos == ((mapJoinDesc)mapJoin.getConf()).getPosBigTable()) ? false : true;
- if (local)
+ boolean local = (pos == (mapJoin.getConf()).getPosBigTable()) ? false
+ : true;
+ if (local) {
throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_TABLE.getMsg());
-
+ }
+
// If the plan for this reducer does not exist, initialize the plan
if (opMapTask == null) {
assert currPlan.getReducer() == null;
@@ -337,26 +368,32 @@
GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, true, false, pos);
ctx.setCurrUnionOp(null);
}
- // The current plan can be thrown away after being merged with the original plan
+ // The current plan can be thrown away after being merged with the
+ // original plan
else {
- Task<? extends Serializable> uTask = ctx.getUnionTask(ctx.getCurrUnionOp()).getUTask();
- if (uTask.getId().equals(opMapTask.getId()))
- GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false, false, true);
- else
- GenMapRedUtils.joinPlan(mapJoin, uTask, opMapTask, ctx, pos, false, false, true);
+ Task<? extends Serializable> uTask = ctx.getUnionTask(
+ ctx.getCurrUnionOp()).getUTask();
+ if (uTask.getId().equals(opMapTask.getId())) {
+ GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false,
+ false, true);
+ } else {
+ GenMapRedUtils.joinPlan(mapJoin, uTask, opMapTask, ctx, pos, false,
+ false, true);
+ }
currTask = opMapTask;
ctx.setCurrTask(currTask);
}
-
- mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+
+ mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx
+ .getCurrTopOp(), ctx.getCurrAliasId()));
return null;
}
}
-
+
public static NodeProcessor getTableScanMapJoin() {
return new TableScanMapJoin();
}
-
+
public static NodeProcessor getUnionMapJoin() {
return new UnionMapJoin();
}