You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/08/29 19:44:02 UTC
svn commit: r1378659 [2/4] - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/index/compact/
java/org/apache/hadoop/hive/ql/io/ java/org/apache/hadoop/hive/ql/lib/
java/org/apach...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hive.ql.parse.O
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
/**
@@ -40,15 +40,15 @@ import org.apache.hadoop.hive.ql.plan.Se
*/
public class ColumnPrunerProcCtx implements NodeProcessorCtx {
- private final Map<Operator<? extends Serializable>, List<String>> prunedColLists;
+ private final Map<Operator<? extends OperatorDesc>, List<String>> prunedColLists;
- private final HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
+ private final HashMap<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtxMap;
private final Map<CommonJoinOperator, Map<Byte, List<String>>> joinPrunedColLists;
public ColumnPrunerProcCtx(
- HashMap<Operator<? extends Serializable>, OpParseContext> opToParseContextMap) {
- prunedColLists = new HashMap<Operator<? extends Serializable>, List<String>>();
+ HashMap<Operator<? extends OperatorDesc>, OpParseContext> opToParseContextMap) {
+ prunedColLists = new HashMap<Operator<? extends OperatorDesc>, List<String>>();
opToParseCtxMap = opToParseContextMap;
joinPrunedColLists = new HashMap<CommonJoinOperator, Map<Byte, List<String>>>();
}
@@ -60,15 +60,15 @@ public class ColumnPrunerProcCtx impleme
/**
* @return the prunedColLists
*/
- public List<String> getPrunedColList(Operator<? extends Serializable> op) {
+ public List<String> getPrunedColList(Operator<? extends OperatorDesc> op) {
return prunedColLists.get(op);
}
- public HashMap<Operator<? extends Serializable>, OpParseContext> getOpToParseCtxMap() {
+ public HashMap<Operator<? extends OperatorDesc>, OpParseContext> getOpToParseCtxMap() {
return opToParseCtxMap;
}
- public Map<Operator<? extends Serializable>, List<String>> getPrunedColLists() {
+ public Map<Operator<? extends OperatorDesc>, List<String>> getPrunedColLists() {
return prunedColLists;
}
@@ -77,17 +77,17 @@ public class ColumnPrunerProcCtx impleme
* RowResolver and are different from the external column names) that are
* needed in the subtree. These columns eventually have to be selected from
* the table scan.
- *
+ *
* @param curOp
* The root of the operator subtree.
* @return List<String> of the internal column names.
* @throws SemanticException
*/
- public List<String> genColLists(Operator<? extends Serializable> curOp)
+ public List<String> genColLists(Operator<? extends OperatorDesc> curOp)
throws SemanticException {
List<String> colList = new ArrayList<String>();
if (curOp.getChildOperators() != null) {
- for (Operator<? extends Serializable> child : curOp.getChildOperators()) {
+ for (Operator<? extends OperatorDesc> child : curOp.getChildOperators()) {
if (child instanceof CommonJoinOperator) {
int tag = child.getParentOperators().indexOf(curOp);
List<String> prunList = joinPrunedColLists.get(child).get((byte) tag);
@@ -105,7 +105,7 @@ public class ColumnPrunerProcCtx impleme
* Creates the list of internal column names from select expressions in a
* select operator. This function is used for the select operator instead of
* the genColLists function (which is used by the rest of the operators).
- *
+ *
* @param op
* The select operator.
* @return List<String> of the internal column names.
@@ -122,7 +122,7 @@ public class ColumnPrunerProcCtx impleme
/**
* Creates the list of internal column names for select * expressions.
- *
+ *
* @param op
* The select operator.
* @param colList
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -62,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
@@ -154,8 +154,8 @@ public final class ColumnPrunerProcFacto
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
- cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
- cppCtx.genColLists((Operator<? extends Serializable>) nd));
+ cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
+ cppCtx.genColLists((Operator<? extends OperatorDesc>) nd));
return null;
}
@@ -180,8 +180,8 @@ public final class ColumnPrunerProcFacto
TableScanOperator scanOp = (TableScanOperator) nd;
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
List<String> cols = cppCtx
- .genColLists((Operator<? extends Serializable>) nd);
- cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
+ .genColLists((Operator<? extends OperatorDesc>) nd);
+ cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
cols);
ArrayList<Integer> needed_columns = new ArrayList<Integer>();
RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
@@ -241,13 +241,13 @@ public final class ColumnPrunerProcFacto
Object... nodeOutputs) throws SemanticException {
ReduceSinkOperator op = (ReduceSinkOperator) nd;
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
- HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap = cppCtx
+ HashMap<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtxMap = cppCtx
.getOpToParseCtxMap();
RowResolver redSinkRR = opToParseCtxMap.get(op).getRowResolver();
ReduceSinkDesc conf = op.getConf();
- List<Operator<? extends Serializable>> childOperators = op
+ List<Operator<? extends OperatorDesc>> childOperators = op
.getChildOperators();
- List<Operator<? extends Serializable>> parentOperators = op
+ List<Operator<? extends OperatorDesc>> parentOperators = op
.getParentOperators();
List<String> colLists = new ArrayList<String>();
@@ -259,7 +259,7 @@ public final class ColumnPrunerProcFacto
if ((childOperators.size() == 1)
&& (childOperators.get(0) instanceof JoinOperator)) {
assert parentOperators.size() == 1;
- Operator<? extends Serializable> par = parentOperators.get(0);
+ Operator<? extends OperatorDesc> par = parentOperators.get(0);
JoinOperator childJoin = (JoinOperator) childOperators.get(0);
RowResolver parRR = opToParseCtxMap.get(par).getRowResolver();
List<String> childJoinCols = cppCtx.getJoinPrunedColLists().get(
@@ -405,7 +405,7 @@ public final class ColumnPrunerProcFacto
LateralViewJoinOperator lvJoin = null;
if (op.getChildOperators() != null) {
- for (Operator<? extends Serializable> child : op.getChildOperators()) {
+ for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
// If one of my children is a FileSink or Script, return all columns.
// Without this break, a bug in ReduceSink to Extract edge column
// pruning will manifest
@@ -490,14 +490,14 @@ public final class ColumnPrunerProcFacto
*/
private void handleChildren(SelectOperator op,
List<String> retainedSelOutputCols, ColumnPrunerProcCtx cppCtx) throws SemanticException {
- for (Operator<? extends Serializable> child : op.getChildOperators()) {
+ for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
if (child instanceof ReduceSinkOperator) {
boolean[] flags = getPruneReduceSinkOpRetainFlags(
retainedSelOutputCols, (ReduceSinkOperator) child);
pruneReduceSinkOperator(flags, (ReduceSinkOperator) child, cppCtx);
} else if (child instanceof FilterOperator) {
// filter operator has the same output columns as its parent
- for (Operator<? extends Serializable> filterChild : child
+ for (Operator<? extends OperatorDesc> filterChild : child
.getChildOperators()) {
if (filterChild instanceof ReduceSinkOperator) {
boolean[] flags = getPruneReduceSinkOpRetainFlags(
@@ -647,7 +647,7 @@ public final class ColumnPrunerProcFacto
}
private static void pruneOperator(NodeProcessorCtx ctx,
- Operator<? extends Serializable> op,
+ Operator<? extends OperatorDesc> op,
List<String> cols)
throws SemanticException {
// the pruning needs to preserve the order of columns in the input schema
@@ -671,7 +671,7 @@ public final class ColumnPrunerProcFacto
* @return
* @throws SemanticException
*/
- private static List<String> preserveColumnOrder(Operator<? extends Serializable> op,
+ private static List<String> preserveColumnOrder(Operator<? extends OperatorDesc> op,
List<String> cols)
throws SemanticException {
RowSchema inputSchema = op.getSchema();
@@ -696,10 +696,10 @@ public final class ColumnPrunerProcFacto
Map<Byte, List<Integer>> retainMap, boolean mapJoin) throws SemanticException {
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
Map<Byte, List<String>> prunedColLists = new HashMap<Byte, List<String>>();
- List<Operator<? extends Serializable>> childOperators = op
+ List<Operator<? extends OperatorDesc>> childOperators = op
.getChildOperators();
- for (Operator<? extends Serializable> child : childOperators) {
+ for (Operator<? extends OperatorDesc> child : childOperators) {
if (child instanceof FileSinkOperator) {
return;
}
@@ -787,7 +787,7 @@ public final class ColumnPrunerProcFacto
}
- for (Operator<? extends Serializable> child : childOperators) {
+ for (Operator<? extends OperatorDesc> child : childOperators) {
if (child instanceof ReduceSinkOperator) {
boolean[] flags = getPruneReduceSinkOpRetainFlags(childColLists,
(ReduceSinkOperator) child);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Aug 29 17:43:59 2012
@@ -69,6 +69,7 @@ import org.apache.hadoop.hive.ql.plan.Lo
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -213,7 +214,7 @@ public class GenMRFileSink1 implements N
}
// create a dummy tableScan operator
- Operator<? extends Serializable> tsMerge = OperatorFactory.get(
+ Operator<? extends OperatorDesc> tsMerge = OperatorFactory.get(
TableScanDesc.class, inputRS);
ArrayList<String> outputColumns = new ArrayList<String>();
@@ -335,7 +336,8 @@ public class GenMRFileSink1 implements N
// Create a TableScan operator
RowSchema inputRS = fsInput.getSchema();
- Operator<? extends Serializable> tsMerge = OperatorFactory.get(TableScanDesc.class, inputRS);
+ Operator<? extends OperatorDesc> tsMerge =
+ OperatorFactory.get(TableScanDesc.class, inputRS);
// Create a FileSink operator
TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
@@ -510,7 +512,7 @@ public class GenMRFileSink1 implements N
* @param parentFS the last FileSinkOperator in the parent MapReduce work
* @return the MapredWork
*/
- private MapredWork createMergeTask(HiveConf conf, Operator<? extends Serializable> topOp,
+ private MapredWork createMergeTask(HiveConf conf, Operator<? extends OperatorDesc> topOp,
FileSinkDesc fsDesc) {
ArrayList<String> aliases = new ArrayList<String>();
@@ -556,7 +558,7 @@ public class GenMRFileSink1 implements N
work.setMapperCannotSpanPartns(true);
work.setPathToAliases(pathToAliases);
work.setAliasToWork(
- new LinkedHashMap<String, Operator<? extends Serializable>>());
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
if (hasDynamicPartitions) {
work.getPathToPartitionInfo().put(inputDir,
new PartitionDesc(tblDesc, null));
@@ -696,11 +698,11 @@ public class GenMRFileSink1 implements N
mvTask = findMoveTask(ctx.getMvTask(), fsOp);
}
- Operator<? extends Serializable> currTopOp = ctx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
String currAliasId = ctx.getCurrAliasId();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
ctx.getOpTaskMap();
- List<Operator<? extends Serializable>> seenOps = ctx.getSeenOps();
+ List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
// Set the move task to be dependent on the current task
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
import java.util.Map;
import java.util.Stack;
@@ -28,6 +27,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* Processor for the rule - no specific rule fired.
@@ -39,7 +39,7 @@ public class GenMROperator implements No
/**
* Reduce Scan encountered.
- *
+ *
* @param nd
* the reduce sink operator encountered
* @param procCtx
@@ -49,10 +49,10 @@ public class GenMROperator implements No
Object... nodeOutputs) throws SemanticException {
GenMRProcContext ctx = (GenMRProcContext) procCtx;
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
- mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+ mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx
.getCurrAliasId()));
return null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Wed Aug 29 17:43:59 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
/**
@@ -54,7 +55,7 @@ public class GenMRProcContext implements
*/
public static class GenMapRedCtx {
Task<? extends Serializable> currTask;
- Operator<? extends Serializable> currTopOp;
+ Operator<? extends OperatorDesc> currTopOp;
String currAliasId;
public GenMapRedCtx() {
@@ -69,7 +70,7 @@ public class GenMRProcContext implements
* the current alias for the to operator
*/
public GenMapRedCtx(Task<? extends Serializable> currTask,
- Operator<? extends Serializable> currTopOp, String currAliasId) {
+ Operator<? extends OperatorDesc> currTopOp, String currAliasId) {
this.currTask = currTask;
this.currTopOp = currTopOp;
this.currAliasId = currAliasId;
@@ -85,7 +86,7 @@ public class GenMRProcContext implements
/**
* @return current top operator
*/
- public Operator<? extends Serializable> getCurrTopOp() {
+ public Operator<? extends OperatorDesc> getCurrTopOp() {
return currTopOp;
}
@@ -105,13 +106,13 @@ public class GenMRProcContext implements
Task<? extends Serializable> uTask;
List<String> taskTmpDir;
List<TableDesc> tt_desc;
- List<Operator<? extends Serializable>> listTopOperators;
+ List<Operator<? extends OperatorDesc>> listTopOperators;
public GenMRUnionCtx() {
uTask = null;
taskTmpDir = new ArrayList<String>();
tt_desc = new ArrayList<TableDesc>();
- listTopOperators = new ArrayList<Operator<? extends Serializable>>();
+ listTopOperators = new ArrayList<Operator<? extends OperatorDesc>>();
}
public Task<? extends Serializable> getUTask() {
@@ -138,16 +139,16 @@ public class GenMRProcContext implements
return tt_desc;
}
- public List<Operator<? extends Serializable>> getListTopOperators() {
+ public List<Operator<? extends OperatorDesc>> getListTopOperators() {
return listTopOperators;
}
public void setListTopOperators(
- List<Operator<? extends Serializable>> listTopOperators) {
+ List<Operator<? extends OperatorDesc>> listTopOperators) {
this.listTopOperators = listTopOperators;
}
- public void addListTopOperators(Operator<? extends Serializable> topOperator) {
+ public void addListTopOperators(Operator<? extends OperatorDesc> topOperator) {
listTopOperators.add(topOperator);
}
}
@@ -159,7 +160,7 @@ public class GenMRProcContext implements
public static class GenMRMapJoinCtx {
String taskTmpDir;
TableDesc tt_desc;
- Operator<? extends Serializable> rootMapJoinOp;
+ Operator<? extends OperatorDesc> rootMapJoinOp;
AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin;
public GenMRMapJoinCtx() {
@@ -176,7 +177,7 @@ public class GenMRProcContext implements
* @param oldMapJoin
*/
public GenMRMapJoinCtx(String taskTmpDir, TableDesc tt_desc,
- Operator<? extends Serializable> rootMapJoinOp,
+ Operator<? extends OperatorDesc> rootMapJoinOp,
AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin) {
this.taskTmpDir = taskTmpDir;
this.tt_desc = tt_desc;
@@ -203,7 +204,7 @@ public class GenMRProcContext implements
/**
* @return the childSelect
*/
- public Operator<? extends Serializable> getRootMapJoinOp() {
+ public Operator<? extends OperatorDesc> getRootMapJoinOp() {
return rootMapJoinOp;
}
@@ -211,7 +212,7 @@ public class GenMRProcContext implements
* @param rootMapJoinOp
* the rootMapJoinOp to set
*/
- public void setRootMapJoinOp(Operator<? extends Serializable> rootMapJoinOp) {
+ public void setRootMapJoinOp(Operator<? extends OperatorDesc> rootMapJoinOp) {
this.rootMapJoinOp = rootMapJoinOp;
}
@@ -232,23 +233,24 @@ public class GenMRProcContext implements
}
private HiveConf conf;
- private HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap;
+ private
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap;
private HashMap<UnionOperator, GenMRUnionCtx> unionTaskMap;
private HashMap<AbstractMapJoinOperator<? extends MapJoinDesc>, GenMRMapJoinCtx> mapJoinTaskMap;
- private List<Operator<? extends Serializable>> seenOps;
+ private List<Operator<? extends OperatorDesc>> seenOps;
private List<FileSinkOperator> seenFileSinkOps;
private ParseContext parseCtx;
private List<Task<MoveWork>> mvTask;
private List<Task<? extends Serializable>> rootTasks;
- private LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx;
+ private LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx;
private Task<? extends Serializable> currTask;
- private Operator<? extends Serializable> currTopOp;
+ private Operator<? extends OperatorDesc> currTopOp;
private UnionOperator currUnionOp;
private AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp;
private String currAliasId;
- private List<Operator<? extends Serializable>> rootOps;
+ private List<Operator<? extends OperatorDesc>> rootOps;
private DependencyCollectionTask dependencyTaskForMultiInsert;
/**
@@ -287,11 +289,11 @@ public class GenMRProcContext implements
*/
public GenMRProcContext(
HiveConf conf,
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap,
- List<Operator<? extends Serializable>> seenOps, ParseContext parseCtx,
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap,
+ List<Operator<? extends OperatorDesc>> seenOps, ParseContext parseCtx,
List<Task<MoveWork>> mvTask,
List<Task<? extends Serializable>> rootTasks,
- LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx,
+ LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx,
Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
this.conf = conf;
this.opTaskMap = opTaskMap;
@@ -307,7 +309,7 @@ public class GenMRProcContext implements
currUnionOp = null;
currMapJoinOp = null;
currAliasId = null;
- rootOps = new ArrayList<Operator<? extends Serializable>>();
+ rootOps = new ArrayList<Operator<? extends OperatorDesc>>();
rootOps.addAll(parseCtx.getTopOps().values());
unionTaskMap = new HashMap<UnionOperator, GenMRUnionCtx>();
mapJoinTaskMap = new HashMap<AbstractMapJoinOperator<? extends MapJoinDesc>, GenMRMapJoinCtx>();
@@ -317,7 +319,8 @@ public class GenMRProcContext implements
/**
* @return reducer to task mapping
*/
- public HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> getOpTaskMap() {
+ public HashMap<Operator<? extends OperatorDesc>,
+ Task<? extends Serializable>> getOpTaskMap() {
return opTaskMap;
}
@@ -326,14 +329,14 @@ public class GenMRProcContext implements
* reducer to task mapping
*/
public void setOpTaskMap(
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap) {
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap) {
this.opTaskMap = opTaskMap;
}
/**
* @return operators already visited
*/
- public List<Operator<? extends Serializable>> getSeenOps() {
+ public List<Operator<? extends OperatorDesc>> getSeenOps() {
return seenOps;
}
@@ -348,7 +351,7 @@ public class GenMRProcContext implements
* @param seenOps
* operators already visited
*/
- public void setSeenOps(List<Operator<? extends Serializable>> seenOps) {
+ public void setSeenOps(List<Operator<? extends OperatorDesc>> seenOps) {
this.seenOps = seenOps;
}
@@ -363,7 +366,7 @@ public class GenMRProcContext implements
/**
* @return top operators for tasks
*/
- public List<Operator<? extends Serializable>> getRootOps() {
+ public List<Operator<? extends OperatorDesc>> getRootOps() {
return rootOps;
}
@@ -371,7 +374,7 @@ public class GenMRProcContext implements
* @param rootOps
* top operators for tasks
*/
- public void setRootOps(List<Operator<? extends Serializable>> rootOps) {
+ public void setRootOps(List<Operator<? extends OperatorDesc>> rootOps) {
this.rootOps = rootOps;
}
@@ -423,7 +426,7 @@ public class GenMRProcContext implements
/**
* @return operator to task mappings
*/
- public LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> getMapCurrCtx() {
+ public LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> getMapCurrCtx() {
return mapCurrCtx;
}
@@ -432,7 +435,7 @@ public class GenMRProcContext implements
* operator to task mappings
*/
public void setMapCurrCtx(
- LinkedHashMap<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx) {
+ LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx) {
this.mapCurrCtx = mapCurrCtx;
}
@@ -454,7 +457,7 @@ public class GenMRProcContext implements
/**
* @return current top operator
*/
- public Operator<? extends Serializable> getCurrTopOp() {
+ public Operator<? extends OperatorDesc> getCurrTopOp() {
return currTopOp;
}
@@ -462,7 +465,7 @@ public class GenMRProcContext implements
* @param currTopOp
* current top operator
*/
- public void setCurrTopOp(Operator<? extends Serializable> currTopOp) {
+ public void setCurrTopOp(Operator<? extends OperatorDesc> currTopOp) {
this.currTopOp = currTopOp;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Wed Aug 29 17:43:59 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* Processor for the rule - table scan followed by reduce sink.
@@ -43,7 +44,7 @@ public class GenMRRedSink1 implements No
/**
* Reduce Scan encountered.
- *
+ *
* @param nd
* the reduce sink operator encountered
* @param opProcCtx
@@ -54,15 +55,15 @@ public class GenMRRedSink1 implements No
ReduceSinkOperator op = (ReduceSinkOperator) nd;
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
MapredWork currPlan = (MapredWork) currTask.getWork();
- Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
String currAliasId = mapredCtx.getCurrAliasId();
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
.getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java Wed Aug 29 17:43:59 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* Processor for the rule - reduce sink followed by reduce sink.
@@ -41,7 +42,7 @@ public class GenMRRedSink2 implements No
/**
* Reduce Scan encountered.
- *
+ *
* @param nd
* the reduce sink operator encountered
* @param opProcCtx
@@ -52,14 +53,14 @@ public class GenMRRedSink2 implements No
ReduceSinkOperator op = (ReduceSinkOperator) nd;
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
- Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
String currAliasId = mapredCtx.getCurrAliasId();
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+ Map<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
.getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Wed Aug 29 17:43:59 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* Processor for the rule - union followed by reduce sink.
@@ -56,8 +57,8 @@ public class GenMRRedSink3 implements No
// union consisted on a bunch of map-reduce jobs, and it has been split at
// the union
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(ctx.getCurrUnionOp());
@@ -70,7 +71,7 @@ public class GenMRRedSink3 implements No
MapredWork plan = (MapredWork) unionTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
.getOpTaskMap();
Task<? extends Serializable> reducerTask = opTaskMap.get(reducer);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java Wed Aug 29 17:43:59 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* Processor for the rule - map join followed by reduce sink.
@@ -43,7 +44,7 @@ public class GenMRRedSink4 implements No
/**
* Reduce Scan encountered.
- *
+ *
* @param nd
* the reduce sink operator encountered
* @param opProcCtx
@@ -58,13 +59,13 @@ public class GenMRRedSink4 implements No
// map-join consisted on a bunch of map-only jobs, and it has been split
// after the mapjoin
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
MapredWork plan = (MapredWork) currTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
.getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Wed Aug 29 17:43:59 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.QBParseInfo;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.StatsWork;
/**
* Processor for the rule - table scan.
@@ -62,17 +63,17 @@ public class GenMRTableScan1 implements
TableScanOperator op = (TableScanOperator) nd;
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
ParseContext parseCtx = ctx.getParseCtx();
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
// create a dummy MapReduce task
MapredWork currWork = GenMapRedUtils.getMapRedWork(parseCtx);
Task<? extends Serializable> currTask = TaskFactory.get(currWork, parseCtx.getConf());
- Operator<? extends Serializable> currTopOp = op;
+ Operator<? extends OperatorDesc> currTopOp = op;
ctx.setCurrTask(currTask);
ctx.setCurrTopOp(currTopOp);
for (String alias : parseCtx.getTopOps().keySet()) {
- Operator<? extends Serializable> currOp = parseCtx.getTopOps().get(alias);
+ Operator<? extends OperatorDesc> currOp = parseCtx.getTopOps().get(alias);
if (currOp == op) {
String currAliasId = alias;
ctx.setCurrAliasId(currAliasId);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Wed Aug 29 17:43:59 2012
@@ -39,13 +39,14 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
+import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -89,7 +90,7 @@ public class GenMRUnion1 implements Node
}
else {
ctx.getMapCurrCtx().put(
- (Operator<? extends Serializable>) union,
+ (Operator<? extends OperatorDesc>) union,
new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
ctx.getCurrAliasId()));
}
@@ -127,8 +128,8 @@ public class GenMRUnion1 implements Node
* @param uCtxTask
*/
private void processSubQueryUnionCreateIntermediate(
- Operator<? extends Serializable> parent,
- Operator<? extends Serializable> child,
+ Operator<? extends OperatorDesc> parent,
+ Operator<? extends OperatorDesc> child,
Task<? extends Serializable> uTask, GenMRProcContext ctx,
GenMRUnionCtx uCtxTask) {
ParseContext parseCtx = ctx.getParseCtx();
@@ -141,21 +142,23 @@ public class GenMRUnion1 implements Node
String taskTmpDir = baseCtx.getMRTmpFileURI();
// Create a file sink operator for this file name
- Operator<? extends Serializable> fs_op = OperatorFactory.get(
+ Operator<? extends OperatorDesc> fs_op = OperatorFactory.get(
new FileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATE)), parent.getSchema());
assert parent.getChildOperators().size() == 1;
parent.getChildOperators().set(0, fs_op);
- List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
+ List<Operator<? extends OperatorDesc>> parentOpList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
parentOpList.add(parent);
fs_op.setParentOperators(parentOpList);
// Create a dummy table scan operator
- Operator<? extends Serializable> ts_op = OperatorFactory.get(
+ Operator<? extends OperatorDesc> ts_op = OperatorFactory.get(
new TableScanDesc(), parent.getSchema());
- List<Operator<? extends Serializable>> childOpList = new ArrayList<Operator<? extends Serializable>>();
+ List<Operator<? extends OperatorDesc>> childOpList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
childOpList.add(child);
ts_op.setChildOperators(childOpList);
child.replaceParent(parent, ts_op);
@@ -199,8 +202,8 @@ public class GenMRUnion1 implements Node
Task<? extends Serializable> uTask = uCtxTask.getUTask();
MapredWork plan = (MapredWork) uTask.getWork();
ctx.setCurrTask(uTask);
- List<Operator<? extends Serializable>> seenOps = ctx.getSeenOps();
- Operator<? extends Serializable> topOp = ctx.getCurrTopOp();
+ List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
+ Operator<? extends OperatorDesc> topOp = ctx.getCurrTopOp();
if (!seenOps.contains(topOp) && topOp != null) {
seenOps.add(topOp);
GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx
@@ -247,7 +250,7 @@ public class GenMRUnion1 implements Node
// Map-only subqueries can be optimized in future to not write to a file in
// future
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
@@ -305,7 +308,7 @@ public class GenMRUnion1 implements Node
ctx.setCurrTask(uTask);
- mapCurrCtx.put((Operator<? extends Serializable>) nd,
+ mapCurrCtx.put((Operator<? extends OperatorDesc>) nd,
new GenMapRedCtx(ctx.getCurrTask(), null, null));
return null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Aug 29 17:43:59 2012
@@ -61,15 +61,16 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
/**
* General utility common functions for the Processor to convert operator into
@@ -92,14 +93,15 @@ public final class GenMapRedUtils {
*/
public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
throws SemanticException {
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
+ opProcCtx.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
MapredWork plan = (MapredWork) currTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
opProcCtx.getOpTaskMap();
- Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
opTaskMap.put(reducer, currTask);
plan.setReducer(reducer);
@@ -117,7 +119,7 @@ public final class GenMapRedUtils {
}
assert currTopOp != null;
- List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+ List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
String currAliasId = opProcCtx.getCurrAliasId();
if (!seenOps.contains(currTopOp)) {
@@ -134,8 +136,9 @@ public final class GenMapRedUtils {
}
public static void initMapJoinPlan(
- Operator<? extends Serializable> op, GenMRProcContext ctx,
- boolean readInputMapJoin, boolean readInputUnion, boolean setReducer, int pos) throws SemanticException {
+ Operator<? extends OperatorDesc> op, GenMRProcContext ctx,
+ boolean readInputMapJoin, boolean readInputUnion, boolean setReducer, int pos)
+ throws SemanticException {
initMapJoinPlan(op, ctx, readInputMapJoin, readInputUnion, setReducer, pos, false);
}
@@ -149,20 +152,21 @@ public final class GenMapRedUtils {
* @param pos
* position of the parent
*/
- public static void initMapJoinPlan(Operator<? extends Serializable> op,
+ public static void initMapJoinPlan(Operator<? extends OperatorDesc> op,
GenMRProcContext opProcCtx, boolean readInputMapJoin,
boolean readInputUnion, boolean setReducer, int pos, boolean createLocalPlan)
throws SemanticException {
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
+ opProcCtx.getMapCurrCtx();
assert (((pos == -1) && (readInputMapJoin)) || (pos != -1));
int parentPos = (pos == -1) ? 0 : pos;
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(
parentPos));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
MapredWork plan = (MapredWork) currTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
opProcCtx.getOpTaskMap();
- Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
// The mapjoin has already been encountered. Some context must be stored
// about that
@@ -173,7 +177,7 @@ public final class GenMapRedUtils {
false : true;
if (setReducer) {
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
plan.setReducer(reducer);
opTaskMap.put(reducer, currTask);
if (reducer.getClass() == JoinOperator.class) {
@@ -189,7 +193,7 @@ public final class GenMapRedUtils {
GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(currMapJoinOp);
String taskTmpDir;
TableDesc tt_desc;
- Operator<? extends Serializable> rootOp;
+ Operator<? extends OperatorDesc> rootOp;
if (mjCtx.getOldMapJoin() == null || setReducer) {
taskTmpDir = mjCtx.getTaskTmpDir();
@@ -222,7 +226,7 @@ public final class GenMapRedUtils {
}
assert currTopOp != null;
- List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+ List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
String currAliasId = opProcCtx.getCurrAliasId();
seenOps.add(currTopOp);
@@ -249,7 +253,7 @@ public final class GenMapRedUtils {
}
if (localPlan == null && createLocalPlan) {
localPlan = new MapredLocalWork(
- new LinkedHashMap<String, Operator<? extends Serializable>>(),
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
new LinkedHashMap<String, FetchWork>());
}
} else {
@@ -298,10 +302,10 @@ public final class GenMapRedUtils {
public static void initUnionPlan(ReduceSinkOperator op,
GenMRProcContext opProcCtx,
Task<? extends Serializable> unionTask) throws SemanticException {
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
MapredWork plan = (MapredWork) unionTask.getWork();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
opProcCtx.getOpTaskMap();
opTaskMap.put(reducer, unionTask);
@@ -320,10 +324,10 @@ public final class GenMapRedUtils {
private static void setUnionPlan(GenMRProcContext opProcCtx,
boolean local, MapredWork plan, GenMRUnionCtx uCtx,
boolean mergeTask) throws SemanticException {
- Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
if (currTopOp != null) {
- List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+ List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
String currAliasId = opProcCtx.getCurrAliasId();
if (!seenOps.contains(currTopOp) || mergeTask) {
seenOps.add(currTopOp);
@@ -340,7 +344,7 @@ public final class GenMapRedUtils {
int size = taskTmpDirLst.size();
assert local == false;
- List<Operator<? extends Serializable>> topOperators =
+ List<Operator<? extends OperatorDesc>> topOperators =
uCtx.getListTopOperators();
for (int pos = 0; pos < size; pos++) {
@@ -422,7 +426,7 @@ public final class GenMapRedUtils {
opProcCtx.setCurrTask(existingTask);
}
- public static void joinPlan(Operator<? extends Serializable> op,
+ public static void joinPlan(Operator<? extends OperatorDesc> op,
Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
GenMRProcContext opProcCtx, int pos, boolean split,
boolean readMapJoinData, boolean readUnionData) throws SemanticException {
@@ -443,14 +447,14 @@ public final class GenMapRedUtils {
* @param pos
* position of the parent in the stack
*/
- public static void joinPlan(Operator<? extends Serializable> op,
+ public static void joinPlan(Operator<? extends OperatorDesc> op,
Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
GenMRProcContext opProcCtx, int pos, boolean split,
boolean readMapJoinData, boolean readUnionData, boolean createLocalWork)
throws SemanticException {
Task<? extends Serializable> currTask = task;
MapredWork plan = (MapredWork) currTask.getWork();
- Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
List<Task<? extends Serializable>> parTasks = null;
// terminate the old task and make current task dependent on it
@@ -471,7 +475,7 @@ public final class GenMapRedUtils {
}
if (currTopOp != null) {
- List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+ List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
String currAliasId = opProcCtx.getCurrAliasId();
if (!seenOps.contains(currTopOp)) {
@@ -500,7 +504,7 @@ public final class GenMapRedUtils {
AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin = mjCtx.getOldMapJoin();
String taskTmpDir = null;
TableDesc tt_desc = null;
- Operator<? extends Serializable> rootOp = null;
+ Operator<? extends OperatorDesc> rootOp = null;
boolean local = ((pos == -1) || (pos == (mjOp.getConf())
.getPosBigTable())) ? false : true;
@@ -552,7 +556,7 @@ public final class GenMapRedUtils {
MapredWork cplan = getMapRedWork(parseCtx);
Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
.getConf());
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
// Add the reducer
cplan.setReducer(reducer);
@@ -560,7 +564,7 @@ public final class GenMapRedUtils {
cplan.setNumReduceTasks(new Integer(desc.getNumReducers()));
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap =
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
opProcCtx.getOpTaskMap();
opTaskMap.put(reducer, redTask);
Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
@@ -584,7 +588,7 @@ public final class GenMapRedUtils {
* processing context
*/
public static void setTaskPlan(String alias_id,
- Operator<? extends Serializable> topOp, MapredWork plan, boolean local,
+ Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
GenMRProcContext opProcCtx) throws SemanticException {
setTaskPlan(alias_id, topOp, plan, local, opProcCtx, null);
}
@@ -606,7 +610,7 @@ public final class GenMapRedUtils {
* pruned partition list. If it is null it will be computed on-the-fly.
*/
public static void setTaskPlan(String alias_id,
- Operator<? extends Serializable> topOp, MapredWork plan, boolean local,
+ Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
ParseContext parseCtx = opProcCtx.getParseCtx();
Set<ReadEntity> inputs = opProcCtx.getInputs();
@@ -810,7 +814,7 @@ public final class GenMapRedUtils {
MapredLocalWork localPlan = plan.getMapLocalWork();
if (localPlan == null) {
localPlan = new MapredLocalWork(
- new LinkedHashMap<String, Operator<? extends Serializable>>(),
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
new LinkedHashMap<String, FetchWork>());
}
@@ -845,7 +849,7 @@ public final class GenMapRedUtils {
* table descriptor
*/
public static void setTaskPlan(String path, String alias,
- Operator<? extends Serializable> topOp, MapredWork plan, boolean local,
+ Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
TableDesc tt_desc) throws SemanticException {
if(path == null || alias == null) {
@@ -864,7 +868,7 @@ public final class GenMapRedUtils {
MapredLocalWork localPlan = plan.getMapLocalWork();
if (localPlan == null) {
localPlan = new MapredLocalWork(
- new LinkedHashMap<String, Operator<? extends Serializable>>(),
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
new LinkedHashMap<String, FetchWork>());
}
@@ -885,7 +889,7 @@ public final class GenMapRedUtils {
* current top operator in the path
*/
public static void setKeyAndValueDesc(MapredWork plan,
- Operator<? extends Serializable> topOp) {
+ Operator<? extends OperatorDesc> topOp) {
if (topOp == null) {
return;
}
@@ -900,9 +904,9 @@ public final class GenMapRedUtils {
}
tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
} else {
- List<Operator<? extends Serializable>> children = topOp.getChildOperators();
+ List<Operator<? extends OperatorDesc>> children = topOp.getChildOperators();
if (children != null) {
- for (Operator<? extends Serializable> op : children) {
+ for (Operator<? extends OperatorDesc> op : children) {
setKeyAndValueDesc(plan, op);
}
}
@@ -935,7 +939,7 @@ public final class GenMapRedUtils {
work.setMapperCannotSpanPartns(mapperCannotSpanPartns);
work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
- work.setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
+ work.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
work.setTagToValueDesc(new ArrayList<TableDesc>());
work.setReducer(null);
work.setHadoopSupportsSplittable(
@@ -954,8 +958,8 @@ public final class GenMapRedUtils {
* parse context
*/
@SuppressWarnings("nls")
- private static Operator<? extends Serializable> putOpInsertMap(
- Operator<? extends Serializable> op, RowResolver rr, ParseContext parseCtx) {
+ public static Operator<? extends OperatorDesc> putOpInsertMap(
+ Operator<? extends OperatorDesc> op, RowResolver rr, ParseContext parseCtx) {
OpParseContext ctx = new OpParseContext(rr);
parseCtx.getOpParseCtx().put(op, ctx);
return op;
@@ -971,12 +975,12 @@ public final class GenMapRedUtils {
* @param setReducer does the reducer needs to be set
* @param pos position of the parent
**/
- public static void splitTasks(Operator<? extends Serializable> op,
+ public static void splitTasks(Operator<? extends OperatorDesc> op,
Task<? extends Serializable> parentTask,
Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
boolean setReducer, boolean local, int posn) throws SemanticException {
childTask.getWork();
- Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
ParseContext parseCtx = opProcCtx.getParseCtx();
parentTask.addDependentTask(childTask);
@@ -992,7 +996,7 @@ public final class GenMapRedUtils {
Context baseCtx = parseCtx.getContext();
String taskTmpDir = baseCtx.getMRTmpFileURI();
- Operator<? extends Serializable> parent = op.getParentOperators().get(posn);
+ Operator<? extends OperatorDesc> parent = op.getParentOperators().get(posn);
TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
@@ -1007,11 +1011,11 @@ public final class GenMapRedUtils {
desc.setCompressType(parseCtx.getConf().getVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
}
- Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory
+ Operator<? extends OperatorDesc> fs_op = putOpInsertMap(OperatorFactory
.get(desc, parent.getSchema()), null, parseCtx);
// replace the reduce child with this operator
- List<Operator<? extends Serializable>> childOpList = parent
+ List<Operator<? extends OperatorDesc>> childOpList = parent
.getChildOperators();
for (int pos = 0; pos < childOpList.size(); pos++) {
if (childOpList.get(pos) == op) {
@@ -1020,30 +1024,31 @@ public final class GenMapRedUtils {
}
}
- List<Operator<? extends Serializable>> parentOpList =
- new ArrayList<Operator<? extends Serializable>>();
+ List<Operator<? extends OperatorDesc>> parentOpList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
parentOpList.add(parent);
fs_op.setParentOperators(parentOpList);
// create a dummy tableScan operator on top of op
// TableScanOperator is implicitly created here for each MapOperator
RowResolver rowResolver = opProcCtx.getParseCtx().getOpParseCtx().get(parent).getRowResolver();
- Operator<? extends Serializable> ts_op = putOpInsertMap(OperatorFactory
+ Operator<? extends OperatorDesc> ts_op = putOpInsertMap(OperatorFactory
.get(TableScanDesc.class, parent.getSchema()), rowResolver, parseCtx);
- childOpList = new ArrayList<Operator<? extends Serializable>>();
+ childOpList = new ArrayList<Operator<? extends OperatorDesc>>();
childOpList.add(op);
ts_op.setChildOperators(childOpList);
op.getParentOperators().set(posn, ts_op);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
+ opProcCtx.getMapCurrCtx();
mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
String streamDesc = taskTmpDir;
MapredWork cplan = (MapredWork) childTask.getWork();
if (setReducer) {
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
if (reducer.getClass() == JoinOperator.class) {
String origStreamDesc;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java Wed Aug 29 17:43:59 2012
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.optimizer;
+import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,9 +37,7 @@ import org.apache.hadoop.hive.ql.parse.Q
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.SplitSample;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-
-import java.io.Serializable;
-import java.util.Map;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* This optimizer is used to reduce the input size for the query for queries which are
@@ -58,7 +58,7 @@ public class GlobalLimitOptimizer implem
public ParseContext transform(ParseContext pctx) throws SemanticException {
Context ctx = pctx.getContext();
- Map<String, Operator<? extends Serializable>> topOps = pctx.getTopOps();
+ Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
GlobalLimitCtx globalLimitCtx = pctx.getGlobalLimitCtx();
Map<TableScanOperator, ExprNodeDesc> opToPartPruner = pctx.getOpToPartPruner();
Map<TableScanOperator, PrunedPartitionList> opToPartList = pctx.getOpToPartList();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
/**
@@ -175,7 +175,7 @@ public class GroupByOptimizer implements
}
for (String table : tblNames) {
- Operator<? extends Serializable> topOp = pGraphContext.getTopOps().get(
+ Operator<? extends OperatorDesc> topOp = pGraphContext.getTopOps().get(
table);
if (topOp == null || (!(topOp instanceof TableScanOperator))) {
// this is in a sub-query.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
@@ -29,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* Implementation of rule-based join table reordering optimization. User passes
@@ -42,7 +42,7 @@ public class JoinReorder implements Tran
* the whole tree is traversed. Possible sizes: 0: the operator and its
* subtree don't contain any big tables 1: the subtree of the operator
* contains a big table 2: the operator is a big table
- *
+ *
* @param operator
* The operator which output size is to be estimated
* @param bigTables
@@ -50,12 +50,12 @@ public class JoinReorder implements Tran
* @return The estimated size - 0 (no streamed tables), 1 (streamed tables in
* subtree) or 2 (a streamed table)
*/
- private int getOutputSize(Operator<? extends Serializable> operator,
+ private int getOutputSize(Operator<? extends OperatorDesc> operator,
Set<String> bigTables) {
// If a join operator contains a big subtree, there is a chance that its
// output is also big, so the output size is 1 (medium)
if (operator instanceof JoinOperator) {
- for (Operator<? extends Serializable> o : operator.getParentOperators()) {
+ for (Operator<? extends OperatorDesc> o : operator.getParentOperators()) {
if (getOutputSize(o, bigTables) != 0) {
return 1;
}
@@ -74,7 +74,7 @@ public class JoinReorder implements Tran
// the biggest output from a parent
int maxSize = 0;
if (operator.getParentOperators() != null) {
- for (Operator<? extends Serializable> o : operator.getParentOperators()) {
+ for (Operator<? extends OperatorDesc> o : operator.getParentOperators()) {
int current = getOutputSize(o, bigTables);
if (current > maxSize) {
maxSize = current;
@@ -87,7 +87,7 @@ public class JoinReorder implements Tran
/**
* Find all big tables from STREAMTABLE hints.
- *
+ *
* @param joinCtx
* The join context
* @return Set of all big tables
@@ -107,7 +107,7 @@ public class JoinReorder implements Tran
/**
* Reorder the tables in a join operator appropriately (by reordering the tags
* of the reduces sinks).
- *
+ *
* @param joinOp
* The join operator to be processed
* @param bigTables
@@ -148,7 +148,7 @@ public class JoinReorder implements Tran
* Transform the query tree. For each join, check which reduce sink will
* output the biggest result (based on STREAMTABLE hints) and give it the
* biggest tag so that it gets streamed.
- *
+ *
* @param pactx
* current parse context
*/
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Wed Aug 29 17:43:59 2012
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -57,9 +58,9 @@ public final class MapJoinFactory {
int pos = 0;
int size = stack.size();
assert size >= 2 && stack.get(size - 1) == op;
- Operator<? extends Serializable> parent = (Operator<? extends Serializable>) stack
- .get(size - 2);
- List<Operator<? extends Serializable>> parOp = op.getParentOperators();
+ Operator<? extends OperatorDesc> parent =
+ (Operator<? extends OperatorDesc>) stack.get(size - 2);
+ List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
pos = parOp.indexOf(parent);
assert pos < parOp.size();
return pos;
@@ -72,24 +73,24 @@ public final class MapJoinFactory {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
+ Object... nodeOutputs) throws SemanticException {
AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
GenMRProcContext ctx = (GenMRProcContext) procCtx;
// find the branch on which this processor was invoked
int pos = getPositionParent(mapJoin, stack);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
pos));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
MapredWork currPlan = (MapredWork) currTask.getWork();
- Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+ Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
String currAliasId = mapredCtx.getCurrAliasId();
- Operator<? extends Serializable> reducer = mapJoin;
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
- .getOpTaskMap();
+ Operator<? extends OperatorDesc> reducer = mapJoin;
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+ ctx.getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
ctx.setCurrTopOp(currTopOp);
@@ -138,11 +139,11 @@ public final class MapJoinFactory {
: true;
GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false,
- local, pos);
+ local, pos);
currTask = opProcCtx.getCurrTask();
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx
- .getOpTaskMap();
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+ opProcCtx.getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
// If the plan for this reducer does not exist, initialize the plan
@@ -195,9 +196,9 @@ public final class MapJoinFactory {
if (listMapJoinOps.contains(mapJoin)) {
ctx.setCurrAliasId(null);
ctx.setCurrTopOp(null);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
- mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+ mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
ctx.getCurrTask(), null, null));
return null;
}
@@ -230,14 +231,15 @@ public final class MapJoinFactory {
sel.setParentOperators(null);
// Create a file sink operator for this file name
- Operator<? extends Serializable> fs_op = OperatorFactory.get(
+ Operator<? extends OperatorDesc> fs_op = OperatorFactory.get(
new FileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATE)), mapJoin.getSchema());
assert mapJoin.getChildOperators().size() == 1;
mapJoin.getChildOperators().set(0, fs_op);
- List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
+ List<Operator<? extends OperatorDesc>> parentOpList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
parentOpList.add(mapJoin);
fs_op.setParentOperators(parentOpList);
@@ -247,9 +249,9 @@ public final class MapJoinFactory {
ctx.setCurrAliasId(null);
ctx.setCurrTopOp(null);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
- mapCurrCtx.put((Operator<? extends Serializable>) nd, new GenMapRedCtx(
+ mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
ctx.getCurrTask(), null, null));
return null;
@@ -263,8 +265,9 @@ public final class MapJoinFactory {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
+ Object... nodeOutputs) throws SemanticException {
+ AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin =
+ (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
GenMRProcContext ctx = (GenMRProcContext) procCtx;
ctx.getParseCtx();
@@ -282,16 +285,16 @@ public final class MapJoinFactory {
// find the branch on which this processor was invoked
int pos = getPositionParent(mapJoin, stack);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
pos));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
MapredWork currPlan = (MapredWork) currTask.getWork();
mapredCtx.getCurrAliasId();
- Operator<? extends Serializable> reducer = mapJoin;
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
- .getOpTaskMap();
+ Operator<? extends OperatorDesc> reducer = mapJoin;
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+ ctx.getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
ctx.setCurrTask(currTask);
@@ -321,7 +324,7 @@ public final class MapJoinFactory {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
+ Object... nodeOutputs) throws SemanticException {
GenMRProcContext ctx = (GenMRProcContext) procCtx;
ParseContext parseCtx = ctx.getParseCtx();
@@ -341,15 +344,15 @@ public final class MapJoinFactory {
// find the branch on which this processor was invoked
int pos = getPositionParent(mapJoin, stack);
- Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx
+ Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
pos));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
MapredWork currPlan = (MapredWork) currTask.getWork();
- Operator<? extends Serializable> reducer = mapJoin;
- HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx
- .getOpTaskMap();
+ Operator<? extends OperatorDesc> reducer = mapJoin;
+ HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+ ctx.getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
// union result cannot be a map table
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -69,6 +68,7 @@ import org.apache.hadoop.hive.ql.plan.Jo
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -97,8 +97,8 @@ public class MapJoinProcessor implements
}
@SuppressWarnings("nls")
- private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op,
- RowResolver rr) {
+ private Operator<? extends OperatorDesc>
+ putOpInsertMap(Operator<? extends OperatorDesc> op, RowResolver rr) {
OpParseContext ctx = new OpParseContext(rr);
pGraphContext.getOpParseCtx().put(op, ctx);
return op;
@@ -120,18 +120,18 @@ public class MapJoinProcessor implements
// create a new MapredLocalWork
MapredLocalWork newLocalWork = new MapredLocalWork(
- new LinkedHashMap<String, Operator<? extends Serializable>>(),
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
new LinkedHashMap<String, FetchWork>());
- for (Map.Entry<String, Operator<? extends Serializable>> entry : newWork.getAliasToWork()
- .entrySet()) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
+ newWork.getAliasToWork().entrySet()) {
String alias = entry.getKey();
- Operator<? extends Serializable> op = entry.getValue();
+ Operator<? extends OperatorDesc> op = entry.getValue();
// if the table scan is for big table; then skip it
// tracing down the operator tree from the table scan operator
- Operator<? extends Serializable> parentOp = op;
- Operator<? extends Serializable> childOp = op.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> parentOp = op;
+ Operator<? extends OperatorDesc> childOp = op.getChildOperators().get(0);
while ((childOp != null) && (!childOp.equals(mapJoinOp))) {
parentOp = childOp;
assert parentOp.getChildOperators().size() == 1;
@@ -218,10 +218,10 @@ public class MapJoinProcessor implements
}
public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
- throws SemanticException {
+ throws SemanticException {
try {
- LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap = newWork
- .getOpParseCtxMap();
+ LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
+ newWork.getOpParseCtxMap();
QBJoinTree newJoinTree = newWork.getJoinTree();
// generate the map join operator; already checked the map join
MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
@@ -256,9 +256,9 @@ public class MapJoinProcessor implements
* @param noCheckOuterJoin
*/
public static MapJoinOperator convertMapJoin(
- LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
- throws SemanticException {
+ LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+ JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
+ throws SemanticException {
// outer join cannot be performed on a table which is being cached
JoinDesc desc = op.getConf();
JoinCondDesc[] condns = desc.getConds();
@@ -279,18 +279,22 @@ public class MapJoinProcessor implements
// The join outputs a concatenation of all the inputs.
QBJoinTree leftSrc = joinTree.getJoinSrc();
- List<Operator<? extends Serializable>> parentOps = op.getParentOperators();
- List<Operator<? extends Serializable>> newParentOps = new ArrayList<Operator<? extends Serializable>>();
- List<Operator<? extends Serializable>> oldReduceSinkParentOps = new ArrayList<Operator<? extends Serializable>>();
+ List<Operator<? extends OperatorDesc>> parentOps = op.getParentOperators();
+ List<Operator<? extends OperatorDesc>> newParentOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ List<Operator<? extends OperatorDesc>> oldReduceSinkParentOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
- HashMap<Byte, HashMap<String, ExprNodeDesc>> columnTransfer = new HashMap<Byte, HashMap<String, ExprNodeDesc>>();
+ HashMap<Byte, HashMap<String, ExprNodeDesc>> columnTransfer =
+ new HashMap<Byte, HashMap<String, ExprNodeDesc>>();
// found a source which is not to be stored in memory
if (leftSrc != null) {
// assert mapJoinPos == 0;
- Operator<? extends Serializable> parentOp = parentOps.get(0);
+ Operator<? extends OperatorDesc> parentOp = parentOps.get(0);
assert parentOp.getParentOperators().size() == 1;
- Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
+ Operator<? extends OperatorDesc> grandParentOp =
+ parentOp.getParentOperators().get(0);
oldReduceSinkParentOps.add(parentOp);
grandParentOp.removeChild(parentOp);
newParentOps.add(grandParentOp);
@@ -300,9 +304,10 @@ public class MapJoinProcessor implements
// Remove parent reduce-sink operators
for (String src : joinTree.getBaseSrc()) {
if (src != null) {
- Operator<? extends Serializable> parentOp = parentOps.get(pos);
+ Operator<? extends OperatorDesc> parentOp = parentOps.get(pos);
assert parentOp.getParentOperators().size() == 1;
- Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
+ Operator<? extends OperatorDesc> grandParentOp =
+ parentOp.getParentOperators().get(0);
grandParentOp.removeChild(parentOp);
oldReduceSinkParentOps.add(parentOp);
@@ -389,7 +394,7 @@ public class MapJoinProcessor implements
Operator[] newPar = new Operator[newParentOps.size()];
pos = 0;
- for (Operator<? extends Serializable> o : newParentOps) {
+ for (Operator<? extends OperatorDesc> o : newParentOps) {
newPar[pos++] = o;
}
@@ -461,8 +466,8 @@ public class MapJoinProcessor implements
// change the children of the original join operator to point to the map
// join operator
- List<Operator<? extends Serializable>> childOps = op.getChildOperators();
- for (Operator<? extends Serializable> childOp : childOps) {
+ List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
+ for (Operator<? extends OperatorDesc> childOp : childOps) {
childOp.replaceParent(op, mapJoinOp);
}
@@ -482,7 +487,7 @@ public class MapJoinProcessor implements
&& HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN);
- LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap = pctx
+ LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
.getOpParseCtx();
MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos,
noCheckOuterJoin);
@@ -577,7 +582,7 @@ public class MapJoinProcessor implements
}
private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws SemanticException {
- List<Operator<? extends Serializable>> childOps = input.getChildOperators();
+ List<Operator<? extends OperatorDesc>> childOps = input.getChildOperators();
input.setChildOperators(null);
// create a dummy select - This select is needed by the walker to split the
@@ -613,7 +618,7 @@ public class MapJoinProcessor implements
// Insert the select operator in between.
sel.setChildOperators(childOps);
- for (Operator<? extends Serializable> ch : childOps) {
+ for (Operator<? extends OperatorDesc> ch : childOps) {
ch.replaceParent(input, sel);
}
}
@@ -764,12 +769,12 @@ public class MapJoinProcessor implements
}
private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOperator mapJoin) {
- Operator<? extends Serializable> parent = mapJoin;
+ Operator<? extends OperatorDesc> parent = mapJoin;
while (true) {
if (parent.getChildOperators() == null || parent.getChildOperators().size() != 1) {
return null;
}
- Operator<? extends Serializable> ch = parent.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> ch = parent.getChildOperators().get(0);
if (ch instanceof MapJoinOperator) {
if (!nonSubqueryMapJoin(ctx.getpGraphContext(), (MapJoinOperator) ch, mapJoin)) {
if (ch.getParentOperators().indexOf(parent) == ((MapJoinOperator) ch).getConf()