You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/07/29 23:08:19 UTC
svn commit: r1508202 [9/48] - in /hive/branches/tez: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/common/metrics/
common/src/java/org/apache/hadoop/hive/conf/ common/src/te...
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Mon Jul 29 21:08:03 2013
@@ -84,7 +84,7 @@ public class GenMRTableScan1 implements
if (currOp == op) {
String currAliasId = alias;
ctx.setCurrAliasId(currAliasId);
- mapCurrCtx.put(op, new GenMapRedCtx(currTask, currTopOp, currAliasId));
+ mapCurrCtx.put(op, new GenMapRedCtx(currTask, currAliasId));
QBParseInfo parseInfo = parseCtx.getQB().getParseInfo();
if (parseInfo.isAnalyzeCommand()) {
@@ -117,7 +117,10 @@ public class GenMRTableScan1 implements
handlePartialScanCommand(op, ctx, parseCtx, currTask, parseInfo, statsWork, statsTask);
}
- currWork.setGatheringStats(true);
+ currWork.getMapWork().setGatheringStats(true);
+ if (currWork.getReduceWork() != null) {
+ currWork.getReduceWork().setGatheringStats(true);
+ }
// NOTE: here we should use the new partition predicate pushdown API to get a list of pruned list,
// and pass it to setTaskPlan as the last parameter
Set<Partition> confirmedPartns = new HashSet<Partition>();
@@ -139,12 +142,12 @@ public class GenMRTableScan1 implements
Table source = parseCtx.getQB().getMetaData().getTableForAlias(alias);
PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns,
new HashSet<Partition>(), null);
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currWork, false, ctx, partList);
+ GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx, partList);
} else { // non-partitioned table
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currWork, false, ctx);
+ GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
}
}
- return null;
+ return true;
}
}
assert false;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Mon Jul 29 21:08:03 2013
@@ -82,14 +82,13 @@ public class GenMRUnion1 implements Node
UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
ctx.getMapCurrCtx().put(
(Operator<? extends OperatorDesc>) union,
- new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+ new GenMapRedCtx(ctx.getCurrTask(),
ctx.getCurrAliasId()));
// if the union is the first time seen, set current task to GenMRUnionCtx
uCtxTask = ctx.getUnionTask(union);
if (uCtxTask == null) {
- uCtxTask = new GenMRUnionCtx();
- uCtxTask.setUTask(ctx.getCurrTask());
+ uCtxTask = new GenMRUnionCtx(ctx.getCurrTask());
ctx.setUnionTask(union, uCtxTask);
}
@@ -101,7 +100,7 @@ public class GenMRUnion1 implements Node
}
}
- return null;
+ return true;
}
/**
@@ -192,14 +191,11 @@ public class GenMRUnion1 implements Node
// The current plan can be thrown away after being merged with the union
// plan
Task<? extends Serializable> uTask = uCtxTask.getUTask();
- MapredWork plan = (MapredWork) uTask.getWork();
ctx.setCurrTask(uTask);
- List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
Operator<? extends OperatorDesc> topOp = ctx.getCurrTopOp();
- if (!seenOps.contains(topOp) && topOp != null) {
- seenOps.add(topOp);
+ if (topOp != null && !ctx.isSeenOp(uTask, topOp)) {
GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx
- .getCurrTopOp(), plan, false, ctx);
+ .getCurrTopOp(), uTask, false, ctx);
}
}
@@ -226,6 +222,14 @@ public class GenMRUnion1 implements Node
// future
Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ if (union.getConf().isAllInputsInSameReducer()) {
+ // All inputs of this UnionOperator are in the same Reducer.
+ // We do not need to break the operator tree.
+ mapCurrCtx.put((Operator<? extends OperatorDesc>) nd,
+ new GenMapRedCtx(ctx.getCurrTask(),ctx.getCurrAliasId()));
+ return null;
+ }
+
UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
ctx.setCurrUnionOp(union);
@@ -246,10 +250,9 @@ public class GenMRUnion1 implements Node
// union is encountered for the first time
GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
if (uCtxTask == null) {
- uCtxTask = new GenMRUnionCtx();
uPlan = GenMapRedUtils.getMapRedWork(parseCtx);
uTask = TaskFactory.get(uPlan, parseCtx.getConf());
- uCtxTask.setUTask(uTask);
+ uCtxTask = new GenMRUnionCtx(uTask);
ctx.setUnionTask(union, uCtxTask);
}
else {
@@ -284,9 +287,9 @@ public class GenMRUnion1 implements Node
ctx.setCurrTask(uTask);
mapCurrCtx.put((Operator<? extends OperatorDesc>) nd,
- new GenMapRedCtx(ctx.getCurrTask(), null, null));
+ new GenMapRedCtx(ctx.getCurrTask(), null));
- return null;
+ return true;
}
private boolean shouldBeRootTask(
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Jul 29 21:08:03 2013
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.DemuxOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -43,6 +44,8 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -60,12 +63,14 @@ import org.apache.hadoop.hive.ql.plan.Fe
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -80,6 +85,10 @@ public final class GenMapRedUtils {
LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
}
+ private static boolean needsTagging(ReduceWork rWork) {
+ return rWork != null && (rWork.getReducer().getClass() == JoinOperator.class ||
+ rWork.getReducer().getClass() == DemuxOperator.class);
+ }
/**
* Initialize the current plan by adding it to root tasks.
*
@@ -101,29 +110,21 @@ public final class GenMapRedUtils {
Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
opTaskMap.put(reducer, currTask);
- plan.setReducer(reducer);
+ plan.setReduceWork(new ReduceWork());
+ plan.getReduceWork().setReducer(reducer);
ReduceSinkDesc desc = op.getConf();
- plan.setNumReduceTasks(desc.getNumReducers());
-
- List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
+ plan.getReduceWork().setNumReduceTasks(desc.getNumReducers());
- if (!rootTasks.contains(currTask)
- && (currTask.getParentTasks() == null
- || currTask.getParentTasks().isEmpty())) {
- rootTasks.add(currTask);
- }
- if (reducer.getClass() == JoinOperator.class) {
- plan.setNeedsTagging(true);
+ if (needsTagging(plan.getReduceWork())) {
+ plan.getReduceWork().setNeedsTagging(true);
}
assert currTopOp != null;
- List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
String currAliasId = opProcCtx.getCurrAliasId();
- if (!seenOps.contains(currTopOp)) {
- seenOps.add(currTopOp);
- setTaskPlan(currAliasId, currTopOp, plan, false, opProcCtx);
+ if (!opProcCtx.isSeenOp(currTask, currTopOp)) {
+ setTaskPlan(currAliasId, currTopOp, currTask, false, opProcCtx);
}
currTopOp = null;
@@ -153,29 +154,30 @@ public final class GenMapRedUtils {
opProcCtx.getOpTaskMap();
opTaskMap.put(reducer, unionTask);
- plan.setReducer(reducer);
+
+ plan.setReduceWork(new ReduceWork());
+ plan.getReduceWork().setReducer(reducer);
+ plan.getReduceWork().setReducer(reducer);
ReduceSinkDesc desc = op.getConf();
- plan.setNumReduceTasks(desc.getNumReducers());
+ plan.getReduceWork().setNumReduceTasks(desc.getNumReducers());
- if (reducer.getClass() == JoinOperator.class) {
- plan.setNeedsTagging(true);
+ if (needsTagging(plan.getReduceWork())) {
+ plan.getReduceWork().setNeedsTagging(true);
}
initUnionPlan(opProcCtx, currUnionOp, unionTask, false);
}
private static void setUnionPlan(GenMRProcContext opProcCtx,
- boolean local, MapredWork plan, GenMRUnionCtx uCtx,
+ boolean local, Task<? extends Serializable> currTask, GenMRUnionCtx uCtx,
boolean mergeTask) throws SemanticException {
Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
if (currTopOp != null) {
- List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
String currAliasId = opProcCtx.getCurrAliasId();
- if (!seenOps.contains(currTopOp) || mergeTask) {
- seenOps.add(currTopOp);
- setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
+ if (mergeTask || !opProcCtx.isSeenOp(currTask, currTopOp)) {
+ setTaskPlan(currAliasId, currTopOp, currTask, local, opProcCtx);
}
currTopOp = null;
opProcCtx.setCurrTopOp(currTopOp);
@@ -191,16 +193,18 @@ public final class GenMapRedUtils {
List<Operator<? extends OperatorDesc>> topOperators =
uCtx.getListTopOperators();
+ MapredWork plan = (MapredWork) currTask.getWork();
for (int pos = 0; pos < size; pos++) {
String taskTmpDir = taskTmpDirLst.get(pos);
TableDesc tt_desc = tt_descLst.get(pos);
- if (plan.getPathToAliases().get(taskTmpDir) == null) {
- plan.getPathToAliases().put(taskTmpDir,
+ MapWork mWork = plan.getMapWork();
+ if (mWork.getPathToAliases().get(taskTmpDir) == null) {
+ mWork.getPathToAliases().put(taskTmpDir,
new ArrayList<String>());
- plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
- plan.getPathToPartitionInfo().put(taskTmpDir,
+ mWork.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
+ mWork.getPathToPartitionInfo().put(taskTmpDir,
new PartitionDesc(tt_desc, null));
- plan.getAliasToWork().put(taskTmpDir, topOperators.get(pos));
+ mWork.getAliasToWork().put(taskTmpDir, topOperators.get(pos));
}
}
}
@@ -214,14 +218,12 @@ public final class GenMapRedUtils {
public static void initUnionPlan(GenMRProcContext opProcCtx, UnionOperator currUnionOp,
Task<? extends Serializable> currTask, boolean local)
throws SemanticException {
- MapredWork plan = (MapredWork) currTask.getWork();
-
// In case of lateral views followed by a join, the same tree
// can be traversed more than one
if (currUnionOp != null) {
GenMRUnionCtx uCtx = opProcCtx.getUnionTask(currUnionOp);
assert uCtx != null;
- setUnionPlan(opProcCtx, local, plan, uCtx, false);
+ setUnionPlan(opProcCtx, local, currTask, uCtx, false);
}
}
@@ -233,12 +235,11 @@ public final class GenMapRedUtils {
Task<? extends Serializable> currentUnionTask,
Task<? extends Serializable> existingTask, boolean local)
throws SemanticException {
- MapredWork plan = (MapredWork) existingTask.getWork();
assert currUnionOp != null;
GenMRUnionCtx uCtx = opProcCtx.getUnionTask(currUnionOp);
assert uCtx != null;
- setUnionPlan(opProcCtx, local, plan, uCtx, true);
+ setUnionPlan(opProcCtx, local, existingTask, uCtx, true);
List<Task<? extends Serializable>> parTasks = null;
if (opProcCtx.getRootTasks().contains(currentUnionTask)) {
@@ -273,104 +274,108 @@ public final class GenMapRedUtils {
}
/**
- * Merge the current task with the task for the current reducer.
+ * Merge the current task into the old task for the reducer
*
- * @param op
- * operator being processed
+ * @param currTask
+ * the current task for the current reducer
* @param oldTask
* the old task for the current reducer
- * @param task
- * the current task for the current reducer
* @param opProcCtx
* processing context
- * @param pos
- * position of the parent in the stack
*/
- public static void joinPlan(Operator<? extends OperatorDesc> op,
- Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
- GenMRProcContext opProcCtx, int pos, boolean split)
+ public static void joinPlan(Task<? extends Serializable> currTask,
+ Task<? extends Serializable> oldTask, GenMRProcContext opProcCtx)
throws SemanticException {
- Task<? extends Serializable> currTask = task;
- MapredWork plan = (MapredWork) currTask.getWork();
+ assert currTask != null && oldTask != null;
+
Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
List<Task<? extends Serializable>> parTasks = null;
-
// terminate the old task and make current task dependent on it
- if (split) {
- assert oldTask != null;
- splitTasks(op, oldTask, currTask, opProcCtx, true, false, 0);
- } else {
- if ((oldTask != null) && (oldTask.getParentTasks() != null)
- && !oldTask.getParentTasks().isEmpty()) {
- parTasks = new ArrayList<Task<? extends Serializable>>();
- parTasks.addAll(oldTask.getParentTasks());
-
- Object[] parTaskArr = parTasks.toArray();
- for (Object element : parTaskArr) {
- ((Task<? extends Serializable>) element).removeDependentTask(oldTask);
- }
+ if (currTask.getParentTasks() != null
+ && !currTask.getParentTasks().isEmpty()) {
+ parTasks = new ArrayList<Task<? extends Serializable>>();
+ parTasks.addAll(currTask.getParentTasks());
+
+ Object[] parTaskArr = parTasks.toArray();
+ for (Object element : parTaskArr) {
+ ((Task<? extends Serializable>) element).removeDependentTask(currTask);
}
}
if (currTopOp != null) {
- List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
- String currAliasId = opProcCtx.getCurrAliasId();
-
- if (!seenOps.contains(currTopOp)) {
- seenOps.add(currTopOp);
- boolean local = false;
- if (pos != -1) {
- local = (pos == ((MapJoinDesc) op.getConf()).getPosBigTable()) ? false
- : true;
- }
- setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
- }
- currTopOp = null;
- opProcCtx.setCurrTopOp(currTopOp);
+ mergeInput(currTopOp, opProcCtx, oldTask, false);
}
- if ((oldTask != null) && (parTasks != null)) {
+ if (parTasks != null) {
for (Task<? extends Serializable> parTask : parTasks) {
- parTask.addDependentTask(currTask);
- if (opProcCtx.getRootTasks().contains(currTask)) {
- opProcCtx.getRootTasks().remove(currTask);
- }
+ parTask.addDependentTask(oldTask);
}
}
- opProcCtx.setCurrTask(currTask);
+ if (oldTask instanceof MapRedTask && currTask instanceof MapRedTask) {
+ ((MapRedTask)currTask).getWork().getMapWork()
+ .mergingInto(((MapRedTask) oldTask).getWork().getMapWork());
+ }
+
+ opProcCtx.setCurrTopOp(null);
+ opProcCtx.setCurrTask(oldTask);
}
/**
- * Split the current plan by creating a temporary destination.
+ * If currTopOp is not set for input of the task, add input for to the task
+ */
+ static boolean mergeInput(Operator<? extends OperatorDesc> currTopOp,
+ GenMRProcContext opProcCtx, Task<? extends Serializable> task, boolean local)
+ throws SemanticException {
+ if (!opProcCtx.isSeenOp(task, currTopOp)) {
+ String currAliasId = opProcCtx.getCurrAliasId();
+ setTaskPlan(currAliasId, currTopOp, task, local, opProcCtx);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Met cRS in pRS(parentTask)-cRS-OP(childTask) case
+ * Split and link two tasks by temporary file : pRS-FS / TS-cRS-OP
+ */
+ static void splitPlan(ReduceSinkOperator cRS,
+ Task<? extends Serializable> parentTask, Task<? extends Serializable> childTask,
+ GenMRProcContext opProcCtx) throws SemanticException {
+ assert parentTask != null && childTask != null;
+ splitTasks(cRS, parentTask, childTask, opProcCtx);
+ }
+
+ /**
+ * Met cRS in pOP(parentTask with RS)-cRS-cOP(noTask) case
+ * Create new child task for cRS-cOP and link two tasks by temporary file : pOP-FS / TS-cRS-cOP
*
- * @param op
+ * @param cRS
* the reduce sink operator encountered
* @param opProcCtx
* processing context
*/
- public static void splitPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
+ static void splitPlan(ReduceSinkOperator cRS, GenMRProcContext opProcCtx)
throws SemanticException {
// Generate a new task
ParseContext parseCtx = opProcCtx.getParseCtx();
- MapredWork cplan = getMapRedWork(parseCtx);
- Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
+ Task<? extends Serializable> parentTask = opProcCtx.getCurrTask();
+
+ MapredWork childPlan = getMapRedWork(parseCtx);
+ Task<? extends Serializable> childTask = TaskFactory.get(childPlan, parseCtx
.getConf());
- Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> reducer = cRS.getChildOperators().get(0);
// Add the reducer
- cplan.setReducer(reducer);
- ReduceSinkDesc desc = op.getConf();
-
- cplan.setNumReduceTasks(new Integer(desc.getNumReducers()));
+ ReduceWork rWork = new ReduceWork();
+ childPlan.setReduceWork(rWork);
+ rWork.setReducer(reducer);
+ ReduceSinkDesc desc = cRS.getConf();
+ childPlan.getReduceWork().setNumReduceTasks(new Integer(desc.getNumReducers()));
- HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
- opProcCtx.getOpTaskMap();
- opTaskMap.put(reducer, redTask);
- Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
+ opProcCtx.getOpTaskMap().put(reducer, childTask);
- splitTasks(op, currTask, redTask, opProcCtx, true, false, 0);
- opProcCtx.getRootOps().add(op);
+ splitTasks(cRS, parentTask, childTask, opProcCtx);
}
/**
@@ -388,9 +393,9 @@ public final class GenMapRedUtils {
* processing context
*/
public static void setTaskPlan(String alias_id,
- Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
+ Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
GenMRProcContext opProcCtx) throws SemanticException {
- setTaskPlan(alias_id, topOp, plan, local, opProcCtx, null);
+ setTaskPlan(alias_id, topOp, task, local, opProcCtx, null);
}
private static ReadEntity getParentViewInfo(String alias_id,
@@ -432,8 +437,9 @@ public final class GenMapRedUtils {
* pruned partition list. If it is null it will be computed on-the-fly.
*/
public static void setTaskPlan(String alias_id,
- Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
+ Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
+ MapWork plan = ((MapredWork) task.getWork()).getMapWork();
ParseContext parseCtx = opProcCtx.getParseCtx();
Set<ReadEntity> inputs = opProcCtx.getInputs();
@@ -488,6 +494,15 @@ public final class GenMapRedUtils {
}
+ Map<String, String> props = parseCtx.getTopToProps().get(topOp);
+ if (props != null) {
+ Properties target = aliasPartnDesc.getProperties();
+ if (target == null) {
+ aliasPartnDesc.setProperties(target = new Properties());
+ }
+ target.putAll(props);
+ }
+
plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
long sizeNeeded = Integer.MAX_VALUE;
@@ -608,6 +623,14 @@ public final class GenMapRedUtils {
tblDesc = Utilities.getTableDesc(part.getTable());
}
+ if (props != null) {
+ Properties target = tblDesc.getProperties();
+ if (target == null) {
+ tblDesc.setProperties(target = new Properties());
+ }
+ target.putAll(props);
+ }
+
for (Path p : paths) {
if (p == null) {
continue;
@@ -681,6 +704,7 @@ public final class GenMapRedUtils {
}
plan.setMapLocalWork(localPlan);
}
+ opProcCtx.addSeenOp(task, topOp);
}
/**
@@ -698,7 +722,7 @@ public final class GenMapRedUtils {
* table descriptor
*/
public static void setTaskPlan(String path, String alias,
- Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
+ Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local,
TableDesc tt_desc) throws SemanticException {
if (path == null || alias == null) {
@@ -737,7 +761,7 @@ public final class GenMapRedUtils {
* @param topOp
* current top operator in the path
*/
- public static void setKeyAndValueDesc(MapredWork plan,
+ public static void setKeyAndValueDesc(ReduceWork plan,
Operator<? extends OperatorDesc> topOp) {
if (topOp == null) {
return;
@@ -778,12 +802,12 @@ public final class GenMapRedUtils {
}
} else if (task instanceof ExecDriver) {
MapredWork work = (MapredWork) task.getWork();
- work.deriveExplainAttributes();
+ work.getMapWork().deriveExplainAttributes();
HashMap<String, Operator<? extends OperatorDesc>> opMap = work
- .getAliasToWork();
+ .getMapWork().getAliasToWork();
if (opMap != null && !opMap.isEmpty()) {
for (Operator<? extends OperatorDesc> op : opMap.values()) {
- setKeyAndValueDesc(work, op);
+ setKeyAndValueDesc(work.getReduceWork(), op);
}
}
}
@@ -804,7 +828,7 @@ public final class GenMapRedUtils {
*/
public static MapredWork getMapRedWork(ParseContext parseCtx) {
MapredWork work = getMapRedWorkFromConf(parseCtx.getConf());
- work.setNameToSplitSample(parseCtx.getNameToSplitSample());
+ work.getMapWork().setNameToSplitSample(parseCtx.getNameToSplitSample());
return work;
}
@@ -815,7 +839,8 @@ public final class GenMapRedUtils {
* @return the new plan
*/
public static MapredWork getMapRedWorkFromConf(HiveConf conf) {
- MapredWork work = new MapredWork();
+ MapredWork mrWork = new MapredWork();
+ MapWork work = mrWork.getMapWork();
boolean mapperCannotSpanPartns =
conf.getBoolVar(
@@ -824,11 +849,9 @@ public final class GenMapRedUtils {
work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
work.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
- work.setTagToValueDesc(new ArrayList<TableDesc>());
- work.setReducer(null);
work.setHadoopSupportsSplittable(
conf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE));
- return work;
+ return mrWork;
}
/**
@@ -851,20 +874,20 @@ public final class GenMapRedUtils {
@SuppressWarnings("nls")
/**
- * Merge the tasks - by creating a temporary file between them.
+ * Split two tasks by creating a temporary file between them.
+ *
* @param op reduce sink operator being processed
- * @param oldTask the parent task
- * @param task the child task
+ * @param parentTask the parent task
+ * @param childTask the child task
* @param opProcCtx context
- * @param setReducer does the reducer needs to be set
- * @param pos position of the parent
**/
- public static void splitTasks(Operator<? extends OperatorDesc> op,
- Task<? extends Serializable> parentTask,
- Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
- boolean setReducer, boolean local, int posn) throws SemanticException {
- childTask.getWork();
- Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
+ private static void splitTasks(ReduceSinkOperator op,
+ Task<? extends Serializable> parentTask, Task<? extends Serializable> childTask,
+ GenMRProcContext opProcCtx) throws SemanticException {
+ if (op.getNumParent() != 1) {
+ throw new IllegalStateException("Expecting operator " + op + " to have one parent. " +
+ "But found multiple parents : " + op.getParentOperators());
+ }
ParseContext parseCtx = opProcCtx.getParseCtx();
parentTask.addDependentTask(childTask);
@@ -880,7 +903,7 @@ public final class GenMapRedUtils {
Context baseCtx = parseCtx.getContext();
String taskTmpDir = baseCtx.getMRTmpFileURI();
- Operator<? extends OperatorDesc> parent = op.getParentOperators().get(posn);
+ Operator<? extends OperatorDesc> parent = op.getParentOperators().get(0);
TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
@@ -922,40 +945,46 @@ public final class GenMapRedUtils {
childOpList = new ArrayList<Operator<? extends OperatorDesc>>();
childOpList.add(op);
ts_op.setChildOperators(childOpList);
- op.getParentOperators().set(posn, ts_op);
+ op.getParentOperators().set(0, ts_op);
Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
opProcCtx.getMapCurrCtx();
- mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
+ mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null));
String streamDesc = taskTmpDir;
MapredWork cplan = (MapredWork) childTask.getWork();
- if (setReducer) {
- Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
- if (reducer.getClass() == JoinOperator.class) {
- String origStreamDesc;
- streamDesc = "$INTNAME";
- origStreamDesc = streamDesc;
- int pos = 0;
- while (cplan.getAliasToWork().get(streamDesc) != null) {
- streamDesc = origStreamDesc.concat(String.valueOf(++pos));
- }
+ if (needsTagging(cplan.getReduceWork())) {
+ String origStreamDesc;
+ streamDesc = "$INTNAME";
+ origStreamDesc = streamDesc;
+ int pos = 0;
+ while (cplan.getMapWork().getAliasToWork().get(streamDesc) != null) {
+ streamDesc = origStreamDesc.concat(String.valueOf(++pos));
}
// TODO: Allocate work to remove the temporary files and make that
// dependent on the redTask
- if (reducer.getClass() == JoinOperator.class) {
- cplan.setNeedsTagging(true);
- }
+ cplan.getReduceWork().setNeedsTagging(true);
}
// Add the path to alias mapping
- setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, local, tt_desc);
+ setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan.getMapWork(), false, tt_desc);
opProcCtx.setCurrTopOp(null);
opProcCtx.setCurrAliasId(null);
opProcCtx.setCurrTask(childTask);
+ opProcCtx.addRootIfPossible(parentTask);
+ }
+
+ static boolean hasBranchFinished(Object... children) {
+ for (Object child : children) {
+ if (child == null) {
+ return false;
+ }
+ }
+ return true;
}
private GenMapRedUtils() {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Mon Jul 29 21:08:03 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -77,7 +78,7 @@ public final class MapJoinFactory {
*/
private static class TableScanMapJoinProcessor implements NodeProcessor {
- public static void setupBucketMapJoinInfo(MapredWork plan,
+ public static void setupBucketMapJoinInfo(MapWork plan,
AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
if (currMapJoinOp != null) {
Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
@@ -144,38 +145,16 @@ public final class MapJoinFactory {
* position of the parent
*/
private static void initMapJoinPlan(AbstractMapJoinOperator<? extends MapJoinDesc> op,
- GenMRProcContext opProcCtx, int pos)
+ Task<? extends Serializable> currTask,
+ GenMRProcContext opProcCtx, boolean local)
throws SemanticException {
- Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
- opProcCtx.getMapCurrCtx();
- int parentPos = (pos == -1) ? 0 : pos;
- GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(
- parentPos));
- Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
- MapredWork plan = (MapredWork) currTask.getWork();
- HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
- opProcCtx.getOpTaskMap();
- Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
-
- MapJoinDesc desc = (MapJoinDesc) op.getConf();
// The map is overloaded to keep track of mapjoins also
- opTaskMap.put(op, currTask);
-
- List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
- if(!rootTasks.contains(currTask)
- && (currTask.getParentTasks() == null
- || currTask.getParentTasks().isEmpty())) {
- rootTasks.add(currTask);
- }
-
- assert currTopOp != null;
- opProcCtx.getSeenOps().add(currTopOp);
+ opProcCtx.getOpTaskMap().put(op, currTask);
+ Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
String currAliasId = opProcCtx.getCurrAliasId();
- boolean local = (pos == desc.getPosBigTable()) ? false : true;
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
- setupBucketMapJoinInfo(plan, op);
+ GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, local, opProcCtx);
}
/**
@@ -191,29 +170,12 @@ public final class MapJoinFactory {
* @param pos
* position of the parent in the stack
*/
- public static void joinMapJoinPlan(AbstractMapJoinOperator<? extends OperatorDesc> op,
+ private static void joinMapJoinPlan(AbstractMapJoinOperator<? extends MapJoinDesc> op,
Task<? extends Serializable> oldTask,
- GenMRProcContext opProcCtx, int pos)
+ GenMRProcContext opProcCtx, boolean local)
throws SemanticException {
- MapredWork plan = (MapredWork) oldTask.getWork();
Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
-
- List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
- String currAliasId = opProcCtx.getCurrAliasId();
-
- if (!seenOps.contains(currTopOp)) {
- seenOps.add(currTopOp);
- boolean local = false;
- if (pos != -1) {
- local = (pos == ((MapJoinDesc) op.getConf()).getPosBigTable()) ? false
- : true;
- }
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
- setupBucketMapJoinInfo(plan, op);
- }
- currTopOp = null;
- opProcCtx.setCurrTopOp(currTopOp);
- opProcCtx.setCurrTask(oldTask);
+ GenMapRedUtils.mergeInput(currTopOp, opProcCtx, oldTask, local);
}
/*
@@ -236,17 +198,14 @@ public final class MapJoinFactory {
Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
- GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
- pos));
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
MapredWork currPlan = (MapredWork) currTask.getWork();
- Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
String currAliasId = mapredCtx.getCurrAliasId();
HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
ctx.getOpTaskMap();
- Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
+ Task<? extends Serializable> oldTask = opTaskMap.get(mapJoin);
- ctx.setCurrTopOp(currTopOp);
ctx.setCurrAliasId(currAliasId);
ctx.setCurrTask(currTask);
@@ -254,20 +213,23 @@ public final class MapJoinFactory {
// If we are seeing this mapjoin for the second or later time then atleast one of the
// branches for this mapjoin have been encounered. Join the plan with the plan created
// the first time.
- if (opMapTask == null) {
- assert currPlan.getReducer() == null;
- initMapJoinPlan(mapJoin, ctx, pos);
+ boolean local = pos != mapJoin.getConf().getPosBigTable();
+ if (oldTask == null) {
+ assert currPlan.getReduceWork() == null;
+ initMapJoinPlan(mapJoin, currTask, ctx, local);
} else {
// The current plan can be thrown away after being merged with the
// original plan
- joinMapJoinPlan(mapJoin, opMapTask, ctx, pos);
- currTask = opMapTask;
- ctx.setCurrTask(currTask);
+ joinMapJoinPlan(mapJoin, oldTask, ctx, local);
+ ctx.setCurrTask(currTask = oldTask);
}
+ MapredWork plan = (MapredWork) currTask.getWork();
+ setupBucketMapJoinInfo(plan.getMapWork(), mapJoin);
+
+ mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrAliasId()));
- mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx
- .getCurrTopOp(), ctx.getCurrAliasId()));
- return null;
+ // local aliases need not to hand over context further
+ return !local;
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Jul 29 21:08:03 2013
@@ -134,7 +134,7 @@ public class MapJoinProcessor implements
new LinkedHashMap<String, FetchWork>());
for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
- newWork.getAliasToWork().entrySet()) {
+ newWork.getMapWork().getAliasToWork().entrySet()) {
String alias = entry.getKey();
Operator<? extends OperatorDesc> op = entry.getValue();
@@ -162,7 +162,7 @@ public class MapJoinProcessor implements
smallTableAliasList.add(alias);
// get input path and remove this alias from pathToAlias
// because this file will be fetched by fetch operator
- LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getPathToAliases();
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getMapWork().getPathToAliases();
// keep record all the input path for this alias
HashSet<String> pathSet = new HashSet<String>();
@@ -193,7 +193,7 @@ public class MapJoinProcessor implements
List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
for (String tablePath : pathSet) {
- PartitionDesc partitionDesc = newWork.getPathToPartitionInfo().get(tablePath);
+ PartitionDesc partitionDesc = newWork.getMapWork().getPathToPartitionInfo().get(tablePath);
// create fetchwork for non partitioned table
if (partitionDesc.getPartSpec() == null || partitionDesc.getPartSpec().size() == 0) {
fetchWork = new FetchWork(tablePath, partitionDesc.getTableDesc());
@@ -205,7 +205,7 @@ public class MapJoinProcessor implements
}
// create fetchwork for partitioned table
if (fetchWork == null) {
- TableDesc table = newWork.getAliasToPartnInfo().get(alias).getTableDesc();
+ TableDesc table = newWork.getMapWork().getAliasToPartnInfo().get(alias).getTableDesc();
fetchWork = new FetchWork(partDir, partDesc, table);
}
// set alias to fetch work
@@ -213,13 +213,13 @@ public class MapJoinProcessor implements
}
// remove small table ailias from aliasToWork;Avoid concurrent modification
for (String alias : smallTableAliasList) {
- newWork.getAliasToWork().remove(alias);
+ newWork.getMapWork().getAliasToWork().remove(alias);
}
// set up local work
- newWork.setMapLocalWork(newLocalWork);
+ newWork.getMapWork().setMapLocalWork(newLocalWork);
// remove reducer
- newWork.setReducer(null);
+ newWork.setReduceWork(null);
// return the big table alias
if (bigTableAlias == null) {
throw new SemanticException("Big Table Alias is null");
@@ -240,8 +240,8 @@ public class MapJoinProcessor implements
public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
throws SemanticException {
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
- newWork.getOpParseCtxMap();
- QBJoinTree newJoinTree = newWork.getJoinTree();
+ newWork.getMapWork().getOpParseCtxMap();
+ QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
// generate the map join operator; already checked the map join
MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
newJoinTree, mapJoinPos, true, false);
@@ -256,14 +256,15 @@ public class MapJoinProcessor implements
String bigTableAlias = MapJoinProcessor
.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
// clean up the mapred work
- newWork.setOpParseCtxMap(null);
- newWork.setJoinTree(null);
+ newWork.getMapWork().setOpParseCtxMap(null);
+ newWork.getMapWork().setJoinTree(null);
return bigTableAlias;
} catch (Exception e) {
e.printStackTrace();
- throw new SemanticException("Generate New MapJoin Opertor Exeception " + e.getMessage());
+ throw new SemanticException("Failed to generate new mapJoin operator " +
+ "by exception : " + e.getMessage());
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Jul 29 21:08:03 2013
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.correlation.CorrelationOptimizer;
+import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkDeDuplication;
import org.apache.hadoop.hive.ql.optimizer.index.RewriteGBUsingIndex;
import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPruner;
@@ -103,6 +105,11 @@ public class Optimizer {
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) {
transformations.add(new GlobalLimitOptimizer());
}
+ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
+ !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
+ !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+ transformations.add(new CorrelationOptimizer());
+ }
transformations.add(new SimpleFetchOptimizer()); // must be called last
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java Mon Jul 29 21:08:03 2013
@@ -28,32 +28,22 @@ import org.apache.hadoop.hive.ql.metadat
* partition pruned for the table scan and table alias.
*/
public class PcrExprProcCtx implements NodeProcessorCtx {
+ /**
+ * The table alias that is being currently processed.
+ */
+ private final String tabAlias;
+ private final List<Partition> partList;
public PcrExprProcCtx(String tabAlias, List<Partition> partList) {
- super();
this.tabAlias = tabAlias;
this.partList = partList;
}
- /**
- * The table alias that is being currently processed.
- */
- String tabAlias;
- List<Partition> partList;
-
public String getTabAlias() {
return tabAlias;
}
- public void setTabAlias(String tabAlias) {
- this.tabAlias = tabAlias;
- }
-
public List<Partition> getPartList() {
return partList;
}
-
- public void setPartList(List<Partition> partList) {
- this.partList = partList;
- }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java Mon Jul 29 21:08:03 2013
@@ -364,16 +364,14 @@ public final class PcrExprProcFactory {
Object... nodeOutputs) throws SemanticException {
ExprNodeFieldDesc fnd = (ExprNodeFieldDesc) nd;
boolean unknown = false;
- int idx = 0;
for (Object child : nodeOutputs) {
NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
if (wrapper.state == WalkState.UNKNOWN) {
unknown = true;
+ break;
}
}
- assert (idx == 0);
-
if (unknown) {
return new NodeInfoWrapper(WalkState.UNKNOWN, null, fnd);
} else {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java Mon Jul 29 21:08:03 2013
@@ -27,14 +27,15 @@ import java.util.Stack;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.MapRedTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
/**
* Common iteration methods for converting joins and sort-merge joins.
@@ -119,7 +120,7 @@ public abstract class AbstractJoinTaskDi
}
}
- public long getTotalKnownInputSize(Context context, MapredWork currWork,
+ public long getTotalKnownInputSize(Context context, MapWork currWork,
Map<String, ArrayList<String>> pathToAliases,
HashMap<String, Long> aliasToSize) throws SemanticException {
try {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java Mon Jul 29 21:08:03 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.plan.Op
*/
public class BucketingSortingCtx implements NodeProcessorCtx {
+ boolean disableBucketing;
+
// A mapping from an operator to the columns by which it's output is bucketed
Map<Operator<? extends OperatorDesc>, List<BucketCol>> bucketedColsByOp;
// A mapping from a directory which a FileSinkOperator writes into to the columns by which that
@@ -48,7 +50,8 @@ public class BucketingSortingCtx impleme
// output is sorted
Map<String, List<SortCol>> sortedColsByDirectory;
- public BucketingSortingCtx() {
+ public BucketingSortingCtx(boolean disableBucketing) {
+ this.disableBucketing = disableBucketing;
this.bucketedColsByOp = new HashMap<Operator<? extends OperatorDesc>, List<BucketCol>>();
this.bucketedColsByDirectory = new HashMap<String, List<BucketCol>>();
this.sortedColsByOp = new HashMap<Operator<? extends OperatorDesc>, List<SortCol>>();
@@ -57,21 +60,25 @@ public class BucketingSortingCtx impleme
public List<BucketCol> getBucketedCols(Operator<? extends OperatorDesc> op) {
- return bucketedColsByOp.get(op);
+ return disableBucketing ? null : bucketedColsByOp.get(op);
}
public void setBucketedCols(Operator<? extends OperatorDesc> op, List<BucketCol> bucketCols) {
- this.bucketedColsByOp.put(op, bucketCols);
+ if (!disableBucketing) {
+ bucketedColsByOp.put(op, bucketCols);
+ }
}
public Map<String, List<BucketCol>> getBucketedColsByDirectory() {
- return bucketedColsByDirectory;
+ return disableBucketing ? null : bucketedColsByDirectory;
}
public void setBucketedColsByDirectory(Map<String, List<BucketCol>> bucketedColsByDirectory) {
- this.bucketedColsByDirectory = bucketedColsByDirectory;
+ if (!disableBucketing) {
+ this.bucketedColsByDirectory = bucketedColsByDirectory;
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java Mon Jul 29 21:08:03 2013
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.exec.ExtractOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -36,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Li
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -86,12 +86,14 @@ public class BucketingSortingInferenceOp
continue;
}
- Operator<? extends OperatorDesc> reducer = mapRedTask.getWork().getReducer();
- if (reducer == null) {
+ if (mapRedTask.getWork().getReduceWork() == null) {
continue;
}
+ Operator<? extends OperatorDesc> reducer = mapRedTask.getWork().getReduceWork().getReducer();
- BucketingSortingCtx bCtx = new BucketingSortingCtx();
+ // uses sampling, which means it's not bucketed
+ boolean disableBucketing = mapRedTask.getWork().getMapWork().getSamplingType() > 0;
+ BucketingSortingCtx bCtx = new BucketingSortingCtx(disableBucketing);
// RuleRegExp rules are used to match operators anywhere in the tree
// RuleExactMatch rules are used to specify exactly what the tree should look like
@@ -143,8 +145,8 @@ public class BucketingSortingInferenceOp
topNodes.add(reducer);
ogw.startWalking(topNodes, null);
- mapRedTask.getWork().getBucketedColsByDirectory().putAll(bCtx.getBucketedColsByDirectory());
- mapRedTask.getWork().getSortedColsByDirectory().putAll(bCtx.getSortedColsByDirectory());
+ mapRedTask.getWork().getMapWork().getBucketedColsByDirectory().putAll(bCtx.getBucketedColsByDirectory());
+ mapRedTask.getWork().getMapWork().getSortedColsByDirectory().putAll(bCtx.getSortedColsByDirectory());
}
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Mon Jul 29 21:08:03 2013
@@ -34,12 +34,12 @@ import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapRedTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
@@ -50,10 +50,12 @@ import org.apache.hadoop.hive.ql.plan.Co
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
/*
* Convert tasks involving JOIN into MAPJOIN.
@@ -108,7 +110,7 @@ public class CommonJoinTaskDispatcher ex
}
// Get the position of the big table for this join operator and the given alias
- private int getPosition(MapredWork work, Operator<? extends OperatorDesc> joinOp,
+ private int getPosition(MapWork work, Operator<? extends OperatorDesc> joinOp,
String alias) {
Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
@@ -127,9 +129,9 @@ public class CommonJoinTaskDispatcher ex
*/
private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
MapRedTask childTask = (MapRedTask) task.getChildTasks().get(0);
- MapredWork work = task.getWork();
+ MapWork work = task.getWork().getMapWork();
MapredLocalWork localWork = work.getMapLocalWork();
- MapredWork childWork = childTask.getWork();
+ MapWork childWork = childTask.getWork().getMapWork();
MapredLocalWork childLocalWork = childWork.getMapLocalWork();
// Can this be merged
@@ -205,21 +207,27 @@ public class CommonJoinTaskDispatcher ex
}
}
+ // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
+ // top of the second
Operator<? extends Serializable> childAliasOp =
childWork.getAliasToWork().values().iterator().next();
if (fop.getParentOperators().size() > 1) {
return;
}
-
- // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
- // top of the second
Operator<? extends Serializable> parentFOp = fop.getParentOperators().get(0);
- parentFOp.getChildOperators().remove(fop);
- parentFOp.getChildOperators().add(childAliasOp);
- List<Operator<? extends OperatorDesc>> parentOps =
- new ArrayList<Operator<? extends OperatorDesc>>();
- parentOps.add(parentFOp);
- childAliasOp.setParentOperators(parentOps);
+ // remove the unnecessary TableScan
+ if (childAliasOp instanceof TableScanOperator) {
+ TableScanOperator tso = (TableScanOperator)childAliasOp;
+ if (tso.getNumChild() != 1) {
+ // shouldn't happen
+ return;
+ }
+ childAliasOp = tso.getChildOperators().get(0);
+ childAliasOp.replaceParent(tso, parentFOp);
+ } else {
+ childAliasOp.setParentOperators(Utilities.makeList(parentFOp));
+ }
+ parentFOp.replaceChild(fop, childAliasOp);
work.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
@@ -256,19 +264,26 @@ public class CommonJoinTaskDispatcher ex
* @param childTask
*/
private void copyReducerConf(MapRedTask task, MapRedTask childTask) {
- MapredWork childWork = childTask.getWork();
+ MapredWork mrChildWork = childTask.getWork();
+ ReduceWork childWork = childTask.getWork().getReduceWork();
+ if (childWork == null) {
+ return;
+ }
+
Operator childReducer = childWork.getReducer();
MapredWork work = task.getWork();
if (childReducer == null) {
return;
}
- work.setReducer(childReducer);
- work.setNumReduceTasks(childWork.getNumReduceTasks());
- work.setJoinTree(childWork.getJoinTree());
- work.setNeedsTagging(childWork.getNeedsTagging());
+ ReduceWork rWork = new ReduceWork();
+ work.setReduceWork(rWork);
+ rWork.setReducer(childReducer);
+ rWork.setNumReduceTasks(childWork.getNumReduceTasks());
+ work.getMapWork().setJoinTree(mrChildWork.getMapWork().getJoinTree());
+ rWork.setNeedsTagging(childWork.getNeedsTagging());
// Make sure the key configuration is correct, clear and regenerate.
- work.getTagToValueDesc().clear();
+ rWork.getTagToValueDesc().clear();
GenMapRedUtils.setKeyAndValueDescForTaskTree(task);
}
@@ -303,10 +318,9 @@ public class CommonJoinTaskDispatcher ex
return;
}
MapRedTask childTask = (MapRedTask) firstChildTask;
- MapredWork mapJoinWork = mapJoinTask.getWork();
+ MapWork mapJoinWork = mapJoinTask.getWork().getMapWork();
MapredWork childWork = childTask.getWork();
- Operator childReducer = childWork.getReducer();
- if (childReducer == null) {
+ if (childWork.getReduceWork() == null) {
// Not a MR job, nothing to merge.
return;
}
@@ -316,7 +330,7 @@ public class CommonJoinTaskDispatcher ex
if (aliasToWork.size() > 1) {
return;
}
- Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
+ Map<String, ArrayList<String>> childPathToAliases = childWork.getMapWork().getPathToAliases();
if (childPathToAliases.size() > 1) {
return;
}
@@ -347,7 +361,7 @@ public class CommonJoinTaskDispatcher ex
}
MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork();
- MapredLocalWork childLocalWork = childWork.getMapLocalWork();
+ MapredLocalWork childLocalWork = childWork.getMapWork().getMapLocalWork();
// Either of them should not be bucketed
if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() != null) ||
@@ -355,12 +369,12 @@ public class CommonJoinTaskDispatcher ex
return;
}
- if (childWork.getAliasToWork().size() > 1) {
+ if (childWork.getMapWork().getAliasToWork().size() > 1) {
return;
}
Operator<? extends Serializable> childAliasOp =
- childWork.getAliasToWork().values().iterator().next();
+ childWork.getMapWork().getAliasToWork().values().iterator().next();
if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) {
return;
}
@@ -387,10 +401,10 @@ public class CommonJoinTaskDispatcher ex
parentOps.add(parentFOp);
childAliasOp.setParentOperators(parentOps);
- mapJoinWork.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
- for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
+ mapJoinWork.getAliasToPartnInfo().putAll(childWork.getMapWork().getAliasToPartnInfo());
+ for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getMapWork().getPathToPartitionInfo()
.entrySet()) {
- if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
+ if (childWork.getMapWork().getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
mapJoinWork.getPathToPartitionInfo()
.put(childWorkEntry.getKey(), childWorkEntry.getValue());
}
@@ -416,6 +430,22 @@ public class CommonJoinTaskDispatcher ex
copyReducerConf(mapJoinTask, childTask);
}
+ public static boolean cannotConvert(String bigTableAlias,
+ Map<String, Long> aliasToSize, long aliasTotalKnownInputSize,
+ long ThresholdOfSmallTblSizeSum) {
+ boolean ret = false;
+ Long aliasKnownSize = aliasToSize.get(bigTableAlias);
+ if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
+ long smallTblTotalKnownSize = aliasTotalKnownInputSize
+ - aliasKnownSize.longValue();
+ if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
+ //this table is not good to be a big table.
+ ret = true;
+ }
+ }
+ return ret;
+ }
+
@Override
public Task<? extends Serializable> processCurrentTask(MapRedTask currTask,
ConditionalTask conditionalTask, Context context)
@@ -428,7 +458,7 @@ public class CommonJoinTaskDispatcher ex
}
currTask.setTaskTag(Task.COMMON_JOIN);
- MapredWork currWork = currTask.getWork();
+ MapWork currWork = currTask.getWork().getMapWork();
// create conditional work list and task list
List<Serializable> listWorks = new ArrayList<Serializable>();
@@ -519,7 +549,7 @@ public class CommonJoinTaskDispatcher ex
if (convertJoinMapJoin) {
// create map join task and set big table as bigTablePosition
- MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst();
+ MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition).getFirst();
newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
replaceTask(currTask, newTask, physicalContext);
@@ -555,23 +585,18 @@ public class CommonJoinTaskDispatcher ex
}
// deep copy a new mapred work from xml
// Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan
- String xml = currWork.toXML();
+ String xml = currTask.getWork().toXML();
InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+ MapredWork newWork = Utilities.deserializeObject(in);
// create map join task and set big table as i
ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork, i);
MapRedTask newTask = newTaskAlias.getFirst();
bigTableAlias = newTaskAlias.getSecond();
- Long aliasKnownSize = aliasToSize.get(bigTableAlias);
- if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
- long smallTblTotalKnownSize = aliasTotalKnownInputSize
- - aliasKnownSize.longValue();
- if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
- // this table is not good to be a big table.
- continue;
- }
+ if (cannotConvert(bigTableAlias, aliasToSize,
+ aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) {
+ continue;
}
// add into conditional task
@@ -642,14 +667,15 @@ public class CommonJoinTaskDispatcher ex
}
private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
- MapredWork work = task.getWork();
- if (work == null) {
+ MapWork mWork = task.getWork().getMapWork();
+ ReduceWork rWork = task.getWork().getReduceWork();
+ if (rWork == null) {
return null;
}
- Operator<? extends OperatorDesc> reducerOp = work.getReducer();
+ Operator<? extends OperatorDesc> reducerOp = rWork.getReducer();
if (reducerOp instanceof JoinOperator) {
/* Is any operator present, which prevents the conversion */
- Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
+ Map<String, Operator<? extends OperatorDesc>> aliasToWork = mWork.getAliasToWork();
for (Operator<? extends OperatorDesc> op : aliasToWork.values()) {
if (!checkOperatorOKMapJoinConversion(op)) {
return null;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Mon Jul 29 21:08:03 2013
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
import java.io.ByteArrayInputStream;
-import java.io.File;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@@ -50,6 +49,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -104,6 +104,7 @@ public final class GenMRSkewJoinProcesso
* https://issues.apache.org/jira/browse/HIVE-964.
*
*/
+ @SuppressWarnings("unchecked")
public static void processSkewJoin(JoinOperator joinOp,
Task<? extends Serializable> currTask, ParseContext parseCtx)
throws SemanticException {
@@ -151,7 +152,7 @@ public final class GenMRSkewJoinProcesso
List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
MapredWork currPlan = (MapredWork) currTask.getWork();
- TableDesc keyTblDesc = (TableDesc) currPlan.getKeyDesc().clone();
+ TableDesc keyTblDesc = (TableDesc) currPlan.getReduceWork().getKeyDesc().clone();
List<String> joinKeys = Utilities
.getColumnNames(keyTblDesc.getProperties());
List<String> joinKeyTypes = Utilities.getColumnTypes(keyTblDesc
@@ -232,7 +233,7 @@ public final class GenMRSkewJoinProcesso
for (int i = 0; i < numAliases - 1; i++) {
Byte src = tags[i];
- MapredWork newPlan = PlanUtils.getMapRedWork();
+ MapWork newPlan = PlanUtils.getMapRedWork().getMapWork();
// This code has been only added for testing
boolean mapperCannotSpanPartns =
@@ -246,7 +247,7 @@ public final class GenMRSkewJoinProcesso
StringBuilder sb = new StringBuilder(xmlPlan);
ByteArrayInputStream bis;
bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
- clonePlan = Utilities.deserializeMapRedWork(bis, parseCtx.getConf());
+ clonePlan = Utilities.deserializeObject(bis);
} catch (UnsupportedEncodingException e) {
throw new SemanticException(e);
}
@@ -276,7 +277,7 @@ public final class GenMRSkewJoinProcesso
newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part);
newPlan.getAliasToPartnInfo().put(alias, part);
- Operator<? extends OperatorDesc> reducer = clonePlan.getReducer();
+ Operator<? extends OperatorDesc> reducer = clonePlan.getReduceWork().getReducer();
assert reducer instanceof JoinOperator;
JoinOperator cloneJoinOp = (JoinOperator) reducer;
@@ -328,16 +329,18 @@ public final class GenMRSkewJoinProcesso
newPlan
.setMinSplitSize(HiveConf.getLongVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT));
newPlan.setInputformat(HiveInputFormat.class.getName());
- Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(
- newPlan, jc);
+
+ MapredWork w = new MapredWork();
+ w.setMapWork(newPlan);
+
+ Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(w, jc);
bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask);
listWorks.add(skewJoinMapJoinTask.getWork());
listTasks.add(skewJoinMapJoinTask);
}
ConditionalWork cndWork = new ConditionalWork(listWorks);
- ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork,
- parseCtx.getConf());
+ ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
cndTsk.setListTasks(listTasks);
cndTsk.setResolver(new ConditionalResolverSkewJoin());
cndTsk
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java Mon Jul 29 21:08:03 2013
@@ -34,7 +34,7 @@ public class IndexWhereResolver implemen
Dispatcher dispatcher = new IndexWhereTaskDispatcher(physicalContext);
GraphWalker opGraphWalker = new DefaultGraphWalker(dispatcher);
ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(physicalContext.rootTasks);
+ topNodes.addAll(physicalContext.getRootTasks());
opGraphWalker.startWalking(topNodes, null);
return physicalContext;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java Mon Jul 29 21:08:03 2013
@@ -28,12 +28,12 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.MapredLocalTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -48,8 +48,7 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
-import
- org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
@@ -73,7 +72,7 @@ public class MapJoinResolver implements
// get all the tasks nodes from root task
ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pctx.rootTasks);
+ topNodes.addAll(pctx.getRootTasks());
// begin to walk through the task tree.
ogw.startWalking(topNodes, null);
@@ -98,14 +97,14 @@ public class MapJoinResolver implements
ConditionalTask conditionalTask) throws SemanticException {
// get current mapred work and its local work
MapredWork mapredWork = (MapredWork) currTask.getWork();
- MapredLocalWork localwork = mapredWork.getMapLocalWork();
+ MapredLocalWork localwork = mapredWork.getMapWork().getMapLocalWork();
if (localwork != null) {
// get the context info and set up the shared tmp URI
Context ctx = physicalContext.getContext();
String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
localwork.setTmpFileURI(tmpFileURI);
String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
- mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
+ mapredWork.getMapWork().setTmpHDFSFileURI(hdfsTmpURI);
// create a task for this local work; right now, this local work is shared
// by the original MapredTask and this new generated MapredLocalTask.
MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork, physicalContext
@@ -134,7 +133,7 @@ public class MapJoinResolver implements
newLocalWork.setTmpFileURI(tmpFileURI);
newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive());
newLocalWork.setBucketMapjoinContext(localwork.copyPartSpecMappingOnly());
- mapredWork.setMapLocalWork(newLocalWork);
+ mapredWork.getMapWork().setMapLocalWork(newLocalWork);
// get all parent tasks
List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
currTask.setParentTasks(null);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java Mon Jul 29 21:08:03 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -171,7 +172,7 @@ public class MetadataOnlyOptimizer imple
Dispatcher disp = new MetadataOnlyTaskDispatcher(pctx);
GraphWalker ogw = new DefaultGraphWalker(disp);
ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pctx.rootTasks);
+ topNodes.addAll(pctx.getRootTasks());
ogw.startWalking(topNodes, null);
return pctx;
}
@@ -188,7 +189,7 @@ public class MetadataOnlyOptimizer imple
physicalContext = context;
}
- private String getAliasForTableScanOperator(MapredWork work,
+ private String getAliasForTableScanOperator(MapWork work,
TableScanOperator tso) {
for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
@@ -211,7 +212,7 @@ public class MetadataOnlyOptimizer imple
return desc;
}
- private List<String> getPathsForAlias(MapredWork work, String alias) {
+ private List<String> getPathsForAlias(MapWork work, String alias) {
List<String> paths = new ArrayList<String>();
for (Map.Entry<String, ArrayList<String>> entry : work.getPathToAliases().entrySet()) {
@@ -223,7 +224,7 @@ public class MetadataOnlyOptimizer imple
return paths;
}
- private void processAlias(MapredWork work, String alias) {
+ private void processAlias(MapWork work, String alias) {
// Change the alias partition desc
PartitionDesc aliasPartn = work.getAliasToPartnInfo().get(alias);
changePartitionToMetadataOnly(aliasPartn);
@@ -247,12 +248,6 @@ public class MetadataOnlyOptimizer imple
return partSpec.toString().replaceAll("[:/#\\?]", "_");
}
- private void convertToMetadataOnlyQuery(MapredWork work,
- TableScanOperator tso) {
- String alias = getAliasForTableScanOperator(work, tso);
- processAlias(work, alias);
- }
-
@Override
public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
throws SemanticException {
@@ -305,8 +300,10 @@ public class MetadataOnlyOptimizer imple
while (iterator.hasNext()) {
TableScanOperator tso = iterator.next();
- LOG.info("Metadata only table scan for " + tso.getConf().getAlias());
- convertToMetadataOnlyQuery((MapredWork) task.getWork(), tso);
+ MapWork work = ((MapredWork) task.getWork()).getMapWork();
+ String alias = getAliasForTableScanOperator(work, tso);
+ LOG.info("Metadata only table scan for " + alias);
+ processAlias(work, alias);
}
return null;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java Mon Jul 29 21:08:03 2013
@@ -72,11 +72,27 @@ public class PhysicalContext {
this.context = context;
}
+ public List<Task<? extends Serializable>> getRootTasks() {
+ return rootTasks;
+ }
+
+ public void setRootTasks(List<Task<? extends Serializable>> rootTasks) {
+ this.rootTasks = rootTasks;
+ }
+
+ public Task<? extends Serializable> getFetchTask() {
+ return fetchTask;
+ }
+
+ public void setFetchTask(Task<? extends Serializable> fetchTask) {
+ this.fetchTask = fetchTask;
+ }
+
public void addToRootTask(Task<? extends Serializable> tsk){
rootTasks.add(tsk);
}
+
public void removeFromRootTask(Task<? extends Serializable> tsk){
rootTasks.remove(tsk);
}
-
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Mon Jul 29 21:08:03 2013
@@ -67,6 +67,9 @@ public class PhysicalOptimizer {
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES)) {
resolvers.add(new MetadataOnlyOptimizer());
}
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY)) {
+ resolvers.add(new SamplingOptimizer());
+ }
// Physical optimizers which follow this need to be careful not to invalidate the inferences
// made by this optimizer. Only optimizers which depend on the results of this one should
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java Mon Jul 29 21:08:03 2013
@@ -51,7 +51,7 @@ public class SkewJoinResolver implements
Dispatcher disp = new SkewJoinTaskDispatcher(pctx);
GraphWalker ogw = new DefaultGraphWalker(disp);
ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pctx.rootTasks);
+ topNodes.addAll(pctx.getRootTasks());
ogw.startWalking(topNodes, null);
return pctx;
}
@@ -74,7 +74,7 @@ public class SkewJoinResolver implements
Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
if (!task.isMapRedTask() || task instanceof ConditionalTask
- || ((MapredWork) task.getWork()).getReducer() == null) {
+ || ((MapredWork) task.getWork()).getReduceWork() == null) {
return null;
}
@@ -94,7 +94,9 @@ public class SkewJoinResolver implements
// iterator the reducer operator tree
ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.add(((MapredWork) task.getWork()).getReducer());
+ if (((MapredWork)task.getWork()).getReduceWork() != null) {
+ topNodes.add(((MapredWork) task.getWork()).getReduceWork().getReducer());
+ }
ogw.startWalking(topNodes, null);
return null;
}