You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/08/29 19:44:02 UTC
svn commit: r1378659 [1/4] - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/index/compact/
java/org/apache/hadoop/hive/ql/io/ java/org/apache/hadoop/hive/ql/lib/
java/org/apach...
Author: namit
Date: Wed Aug 29 17:43:59 2012
New Revision: 1378659
URL: http://svn.apache.org/viewvc?rev=1378659&view=rev
Log:
HIVE-3410 All operators's conf should inherit from a common class
(Namit via Carl)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrOpProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrOpWalkerCtx.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrintOpTreeProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CollectDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExtractDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ForwardDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewForwardDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListSinkDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Aug 29 17:43:59 2012
@@ -94,6 +94,7 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -509,7 +510,7 @@ public class Driver implements CommandPr
}
private void doAuthorization(BaseSemanticAnalyzer sem)
- throws HiveException, AuthorizationException {
+ throws HiveException, AuthorizationException {
HashSet<ReadEntity> inputs = sem.getInputs();
HashSet<WriteEntity> outputs = sem.getOutputs();
SessionState ss = SessionState.get();
@@ -583,9 +584,9 @@ public class Driver implements CommandPr
ParseContext parseCtx = querySem.getParseContext();
Map<TableScanOperator, Table> tsoTopMap = parseCtx.getTopToTable();
- for (Map.Entry<String, Operator<? extends Serializable>> topOpMap : querySem
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : querySem
.getParseContext().getTopOps().entrySet()) {
- Operator<? extends Serializable> topOp = topOpMap.getValue();
+ Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
if (topOp instanceof TableScanOperator
&& tsoTopMap.containsKey(topOp)) {
TableScanOperator tableScanOp = (TableScanOperator) topOp;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Wed Aug 29 17:43:59 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.hooks.L
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
import org.apache.hadoop.hive.ql.plan.api.NodeType;
import org.apache.hadoop.hive.ql.plan.api.TaskType;
@@ -152,18 +153,18 @@ public class QueryPlan implements Serial
*/
private void populateOperatorGraph(
org.apache.hadoop.hive.ql.plan.api.Task task,
- Collection<Operator<? extends Serializable>> topOps) {
+ Collection<Operator<? extends OperatorDesc>> topOps) {
task.setOperatorGraph(new org.apache.hadoop.hive.ql.plan.api.Graph());
task.getOperatorGraph().setNodeType(NodeType.OPERATOR);
- Queue<Operator<? extends Serializable>> opsToVisit =
- new LinkedList<Operator<? extends Serializable>>();
- Set<Operator<? extends Serializable>> opsVisited =
- new HashSet<Operator<? extends Serializable>>();
+ Queue<Operator<? extends OperatorDesc>> opsToVisit =
+ new LinkedList<Operator<? extends OperatorDesc>>();
+ Set<Operator<? extends OperatorDesc>> opsVisited =
+ new HashSet<Operator<? extends OperatorDesc>>();
opsToVisit.addAll(topOps);
while (opsToVisit.peek() != null) {
- Operator<? extends Serializable> op = opsToVisit.remove();
+ Operator<? extends OperatorDesc> op = opsToVisit.remove();
opsVisited.add(op);
// populate the operator
org.apache.hadoop.hive.ql.plan.api.Operator operator =
@@ -177,7 +178,7 @@ public class QueryPlan implements Serial
new org.apache.hadoop.hive.ql.plan.api.Adjacency();
entry.setAdjacencyType(AdjacencyType.CONJUNCTIVE);
entry.setNode(op.getOperatorId());
- for (Operator<? extends Serializable> childOp : op.getChildOperators()) {
+ for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
entry.addToChildren(childOp.getOperatorId());
if (!opsVisited.contains(childOp)) {
opsToVisit.add(childOp);
@@ -230,8 +231,8 @@ public class QueryPlan implements Serial
reduceTask.setTaskId(stage.getStageId() + "_REDUCE");
reduceTask.setTaskType(TaskType.REDUCE);
stage.addToTaskList(reduceTask);
- Collection<Operator<? extends Serializable>> reducerTopOps =
- new ArrayList<Operator<? extends Serializable>>();
+ Collection<Operator<? extends OperatorDesc>> reducerTopOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
reducerTopOps.add(mrTask.getWork().getReducer());
populateOperatorGraph(reduceTask, reducerTopOps);
}
@@ -309,8 +310,11 @@ public class QueryPlan implements Serial
} else {
task.setStarted(started.contains(task.getTaskId()));
task.setDone(done.contains(task.getTaskId()));
- for (org.apache.hadoop.hive.ql.plan.api.Operator op : task
- .getOperatorList()) {
+ if (task.getOperatorList() == null) {
+ return;
+ }
+ for (org.apache.hadoop.hive.ql.plan.api.Operator op :
+ task.getOperatorList()) {
// if the task has started, all operators within the task have
// started
op.setStarted(started.contains(task.getTaskId()));
@@ -370,8 +374,8 @@ public class QueryPlan implements Serial
done.add(task.getId() + "_MAP");
}
if (mrTask.hasReduce()) {
- Collection<Operator<? extends Serializable>> reducerTopOps =
- new ArrayList<Operator<? extends Serializable>>();
+ Collection<Operator<? extends OperatorDesc>> reducerTopOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
reducerTopOps.add(mrTask.getWork().getReducer());
extractOperatorCounters(reducerTopOps, task.getId() + "_REDUCE");
if (mrTask.reduceStarted()) {
@@ -393,21 +397,21 @@ public class QueryPlan implements Serial
}
private void extractOperatorCounters(
- Collection<Operator<? extends Serializable>> topOps, String taskId) {
- Queue<Operator<? extends Serializable>> opsToVisit =
- new LinkedList<Operator<? extends Serializable>>();
- Set<Operator<? extends Serializable>> opsVisited =
- new HashSet<Operator<? extends Serializable>>();
+ Collection<Operator<? extends OperatorDesc>> topOps, String taskId) {
+ Queue<Operator<? extends OperatorDesc>> opsToVisit =
+ new LinkedList<Operator<? extends OperatorDesc>>();
+ Set<Operator<? extends OperatorDesc>> opsVisited =
+ new HashSet<Operator<? extends OperatorDesc>>();
opsToVisit.addAll(topOps);
while (opsToVisit.size() != 0) {
- Operator<? extends Serializable> op = opsToVisit.remove();
+ Operator<? extends OperatorDesc> op = opsToVisit.remove();
opsVisited.add(op);
counters.put(op.getOperatorId(), op.getCounters());
if (op.getDone()) {
done.add(op.getOperatorId());
}
if (op.getChildOperators() != null) {
- for (Operator<? extends Serializable> childOp : op.getChildOperators()) {
+ for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
if (!opsVisited.contains(childOp)) {
opsToVisit.add(childOp);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Aug 29 17:43:59 2012
@@ -50,9 +50,9 @@ import org.apache.hadoop.hive.common.Com
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
@@ -66,6 +66,7 @@ import org.apache.hadoop.hive.ql.plan.Fe
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -178,7 +179,7 @@ public class ExecDriver extends Task<Map
* @return true if fatal errors happened during job execution, false otherwise.
*/
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
- for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+ for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
if (op.checkFatalErrors(ctrs, errMsg)) {
return true;
}
@@ -195,7 +196,8 @@ public class ExecDriver extends Task<Map
// fix up outputs
Map<String, ArrayList<String>> pa = work.getPathToAliases();
if (pa != null) {
- ArrayList<Operator<? extends Serializable>> opList = new ArrayList<Operator<? extends Serializable>>();
+ List<Operator<? extends OperatorDesc>> opList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
if (work.getReducer() != null) {
opList.add(work.getReducer());
@@ -206,7 +208,7 @@ public class ExecDriver extends Task<Map
opList.add(work.getAliasToWork().get(a));
while (!opList.isEmpty()) {
- Operator<? extends Serializable> op = opList.remove(0);
+ Operator<? extends OperatorDesc> op = opList.remove(0);
if (op instanceof FileSinkOperator) {
FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
@@ -489,7 +491,7 @@ public class ExecDriver extends Task<Map
if (rj != null) {
JobCloseFeedBack feedBack = new JobCloseFeedBack();
if (work.getAliasToWork() != null) {
- for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+ for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
op.jobClose(job, success, feedBack);
}
}
@@ -743,7 +745,7 @@ public class ExecDriver extends Task<Map
}
@Override
- public Collection<Operator<? extends Serializable>> getTopOperators() {
+ public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
return getWork().getAliasToWork().values();
}
@@ -947,11 +949,12 @@ public class ExecDriver extends Task<Map
if (pa != null) {
for (List<String> ls : pa.values()) {
for (String a : ls) {
- ArrayList<Operator<? extends Serializable>> opList = new ArrayList<Operator<? extends Serializable>>();
+ ArrayList<Operator<? extends OperatorDesc>> opList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
opList.add(work.getAliasToWork().get(a));
while (!opList.isEmpty()) {
- Operator<? extends Serializable> op = opList.remove(0);
+ Operator<? extends OperatorDesc> op = opList.remove(0);
if (op instanceof FileSinkOperator) {
FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
@@ -973,7 +976,7 @@ public class ExecDriver extends Task<Map
@Override
public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
- for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+ for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
op.updateCounters(ctrs);
}
if (work.getReducer() != null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Wed Aug 29 17:43:59 2012
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
-import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URLClassLoader;
@@ -31,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
@@ -104,8 +104,8 @@ public class ExecMapper extends MapReduc
//The following code is for mapjoin
//initialize all the dummy ops
l4j.info("Initializing dummy operator");
- List<Operator<? extends Serializable>> dummyOps = localWork.getDummyParentOp();
- for(Operator<? extends Serializable> dummyOp : dummyOps){
+ List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
dummyOp.setExecContext(execContext);
dummyOp.initialize(jc,null);
}
@@ -194,9 +194,9 @@ public class ExecMapper extends MapReduc
//for close the local work
if(localWork != null){
- List<Operator<? extends Serializable>> dummyOps = localWork.getDummyParentOp();
+ List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
- for(Operator<? extends Serializable> dummyOp : dummyOps){
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
dummyOp.close(abort);
}
}
@@ -204,7 +204,7 @@ public class ExecMapper extends MapReduc
if (fetchOperators != null) {
MapredLocalWork localWork = mo.getConf().getMapLocalWork();
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- Operator<? extends Serializable> forwardOp = localWork
+ Operator<? extends OperatorDesc> forwardOp = localWork
.getAliasToWork().get(entry.getKey());
forwardOp.close(abort);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Wed Aug 29 17:43:59 2012
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
@@ -41,12 +42,12 @@ import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.json.JSONException;
import org.json.JSONObject;
-import java.lang.reflect.InvocationTargetException;
/**
* ExplainTask implementation.
@@ -281,7 +282,8 @@ public class ExplainTask extends Task<Ex
// If this is an operator then we need to call the plan generation on the
// conf and then the children
if (work instanceof Operator) {
- Operator<? extends Serializable> operator = (Operator<? extends Serializable>) work;
+ Operator<? extends OperatorDesc> operator =
+ (Operator<? extends OperatorDesc>) work;
if (operator.getConf() != null) {
JSONObject jsonOut = outputPlan(operator.getConf(), out, extended,
jsonOutput, jsonOutput ? 0 : indent);
@@ -291,7 +293,7 @@ public class ExplainTask extends Task<Ex
}
if (operator.getChildOperators() != null) {
- for (Operator<? extends Serializable> op : operator.getChildOperators()) {
+ for (Operator<? extends OperatorDesc> op : operator.getChildOperators()) {
JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, jsonOutput ? 0 : indent + 2);
if (jsonOutput) {
json.put(operator.getOperatorId(), jsonOut);
@@ -651,6 +653,7 @@ public class ExplainTask extends Task<Ex
throw new RuntimeException("Unexpected call");
}
+ @Override
public List<FieldSchema> getResultSchema() {
FieldSchema tmpFieldSchema = new FieldSchema();
List<FieldSchema> colList = new ArrayList<FieldSchema>();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Aug 29 17:43:59 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.Ag
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
@@ -1057,7 +1058,7 @@ public class GroupByOperator extends Ope
// Group by contains the columns needed - no need to aggregate from children
public List<String> genColLists(
- HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+ HashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx) {
List<String> colLists = new ArrayList<String>();
ArrayList<ExprNodeDesc> keys = conf.getKeys();
for (ExprNodeDesc key : keys) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Aug 29 17:43:59 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -58,7 +59,7 @@ import org.apache.hadoop.util.StringUtil
* different from regular operators in that it starts off by processing a
* Writable data structure from a Table (instead of a Hive Object).
**/
-public class MapOperator extends Operator<MapredWork> implements Serializable {
+public class MapOperator extends Operator<MapredWork> implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
@@ -83,17 +84,17 @@ public class MapOperator extends Operato
private Map<MapInputPath, MapOpCtx> opCtxMap;
private final Set<MapInputPath> listInputPaths = new HashSet<MapInputPath>();
- private Map<Operator<? extends Serializable>, java.util.ArrayList<String>> operatorToPaths;
+ private Map<Operator<? extends OperatorDesc>, ArrayList<String>> operatorToPaths;
- private final Map<Operator<? extends Serializable>, MapOpCtx> childrenOpToOpCtxMap =
- new HashMap<Operator<? extends Serializable>, MapOpCtx>();
+ private final Map<Operator<? extends OperatorDesc>, MapOpCtx> childrenOpToOpCtxMap =
+ new HashMap<Operator<? extends OperatorDesc>, MapOpCtx>();
- private ArrayList<Operator<? extends Serializable>> extraChildrenToClose = null;
+ private ArrayList<Operator<? extends OperatorDesc>> extraChildrenToClose = null;
private static class MapInputPath {
String path;
String alias;
- Operator<? extends Serializable> op;
+ Operator<? extends OperatorDesc> op;
/**
* @param path
@@ -101,7 +102,7 @@ public class MapOperator extends Operato
* @param op
*/
public MapInputPath(String path, String alias,
- Operator<? extends Serializable> op) {
+ Operator<? extends OperatorDesc> op) {
this.path = path;
this.alias = alias;
this.op = op;
@@ -129,11 +130,11 @@ public class MapOperator extends Operato
return ret;
}
- public Operator<? extends Serializable> getOp() {
+ public Operator<? extends OperatorDesc> getOp() {
return op;
}
- public void setOp(Operator<? extends Serializable> op) {
+ public void setOp(Operator<? extends OperatorDesc> op) {
this.op = op;
}
@@ -304,7 +305,7 @@ public class MapOperator extends Operato
* need to be changed if the input changes
**/
private void setInspectorInput(MapInputPath inp) {
- Operator<? extends Serializable> op = inp.getOp();
+ Operator<? extends OperatorDesc> op = inp.getOp();
deserializer = opCtxMap.get(inp).getDeserializer();
isPartitioned = opCtxMap.get(inp).isPartitioned();
@@ -367,9 +368,10 @@ public class MapOperator extends Operato
Path fpath = new Path((new Path(HiveConf.getVar(hconf,
HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
- ArrayList<Operator<? extends Serializable>> children = new ArrayList<Operator<? extends Serializable>>();
+ ArrayList<Operator<? extends OperatorDesc>> children =
+ new ArrayList<Operator<? extends OperatorDesc>>();
opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
- operatorToPaths = new HashMap<Operator<? extends Serializable>, java.util.ArrayList<String>>();
+ operatorToPaths = new HashMap<Operator<? extends OperatorDesc>, ArrayList<String>>();
statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
@@ -380,17 +382,17 @@ public class MapOperator extends Operato
List<String> aliases = conf.getPathToAliases().get(onefile);
for (String onealias : aliases) {
- Operator<? extends Serializable> op = conf.getAliasToWork().get(
+ Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(
onealias);
LOG.info("Adding alias " + onealias + " to work list for file "
+ onefile);
MapInputPath inp = new MapInputPath(onefile, onealias, op);
opCtxMap.put(inp, opCtx);
if (operatorToPaths.get(op) == null) {
- operatorToPaths.put(op, new java.util.ArrayList<String>());
+ operatorToPaths.put(op, new ArrayList<String>());
}
operatorToPaths.get(op).add(onefile);
- op.setParentOperators(new ArrayList<Operator<? extends Serializable>>());
+ op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
op.getParentOperators().add(this);
// check for the operators who will process rows coming to this Map
// Operator
@@ -423,11 +425,11 @@ public class MapOperator extends Operato
public void initializeOp(Configuration hconf) throws HiveException {
// set that parent initialization is done and call initialize on children
state = State.INIT;
- List<Operator<? extends Serializable>> children = getChildOperators();
+ List<Operator<? extends OperatorDesc>> children = getChildOperators();
- for (Entry<Operator<? extends Serializable>, MapOpCtx> entry : childrenOpToOpCtxMap
+ for (Entry<Operator<? extends OperatorDesc>, MapOpCtx> entry : childrenOpToOpCtxMap
.entrySet()) {
- Operator<? extends Serializable> child = entry.getKey();
+ Operator<? extends OperatorDesc> child = entry.getKey();
MapOpCtx mapOpCtx = entry.getValue();
// Add alias, table name, and partitions to hadoop conf so that their
// children will
@@ -448,12 +450,12 @@ public class MapOperator extends Operato
HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, entry
.getValue().partName);
MapInputPath input = entry.getKey();
- Operator<? extends Serializable> op = input.op;
+ Operator<? extends OperatorDesc> op = input.op;
// op is not in the children list, so need to remember it and close it
// afterwards
if (children.indexOf(op) == -1) {
if (extraChildrenToClose == null) {
- extraChildrenToClose = new ArrayList<Operator<? extends Serializable>>();
+ extraChildrenToClose = new ArrayList<Operator<? extends OperatorDesc>>();
}
extraChildrenToClose.add(op);
op.initialize(hconf, new ObjectInspector[] {entry.getValue().getRowObjectInspector()});
@@ -467,7 +469,7 @@ public class MapOperator extends Operato
@Override
public void closeOp(boolean abort) throws HiveException {
if (extraChildrenToClose != null) {
- for (Operator<? extends Serializable> op : extraChildrenToClose) {
+ for (Operator<? extends OperatorDesc> op : extraChildrenToClose) {
op.close(abort);
}
}
@@ -486,7 +488,7 @@ public class MapOperator extends Operato
// Operator
if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
String onealias = conf.getPathToAliases().get(onefile).get(0);
- Operator<? extends Serializable> op =
+ Operator<? extends OperatorDesc> op =
conf.getAliasToWork().get(onealias);
LOG.info("Processing alias " + onealias + " for file " + onefile);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Wed Aug 29 17:43:59 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.DriverC
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
@@ -548,7 +549,7 @@ public class MapRedTask extends ExecDriv
}
@Override
- public Operator<? extends Serializable> getReducer() {
+ public Operator<? extends OperatorDesc> getReducer() {
return getWork().getReducer();
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Wed Aug 29 17:43:59 2012
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -312,7 +313,7 @@ public class MapredLocalTask extends Tas
}
// get the root operator
- Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(alias);
+ Operator<? extends OperatorDesc> forwardOp = work.getAliasToWork().get(alias);
// walk through the operator tree
while (true) {
InspectableObject row = fetchOp.getNextRow();
@@ -342,7 +343,8 @@ public class MapredLocalTask extends Tas
for (Map.Entry<String, FetchWork> entry : work.getAliasToFetchWork().entrySet()) {
JobConf jobClone = new JobConf(job);
- Operator<? extends Serializable> tableScan = work.getAliasToWork().get(entry.getKey());
+ Operator<? extends OperatorDesc> tableScan =
+ work.getAliasToWork().get(entry.getKey());
boolean setColumnsNeeded = false;
if (tableScan instanceof TableScanOperator) {
ArrayList<Integer> list = ((TableScanOperator) tableScan).getNeededColumnIDs();
@@ -366,7 +368,7 @@ public class MapredLocalTask extends Tas
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
// get the forward op
String alias = entry.getKey();
- Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(alias);
+ Operator<? extends OperatorDesc> forwardOp = work.getAliasToWork().get(alias);
// put the exe context into all the operators
forwardOp.setExecContext(execContext);
@@ -386,8 +388,8 @@ public class MapredLocalTask extends Tas
private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException {
// find the (byte)tag for the map join(HashTableSinkOperator)
- Operator<? extends Serializable> parentOp = work.getAliasToWork().get(alias);
- Operator<? extends Serializable> childOp = parentOp.getChildOperators().get(0);
+ Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
+ Operator<? extends OperatorDesc> childOp = parentOp.getChildOperators().get(0);
while ((childOp != null) && (!(childOp instanceof HashTableSinkOperator))) {
parentOp = childOp;
assert parentOp.getChildOperators().size() == 1;
@@ -447,7 +449,7 @@ public class MapredLocalTask extends Tas
}
@Override
- public Collection<Operator<? extends Serializable>> getTopOperators() {
+ public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
return getWork().getAliasToWork().values();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Aug 29 17:43:59 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -46,15 +47,15 @@ import org.apache.hadoop.mapred.Reporter
/**
* Base operator implementation.
**/
-public abstract class Operator<T extends Serializable> implements Serializable,
- Node {
+public abstract class Operator<T extends OperatorDesc> implements Serializable,Cloneable,
+ Node {
// Bean methods
private static final long serialVersionUID = 1L;
- protected List<Operator<? extends Serializable>> childOperators;
- protected List<Operator<? extends Serializable>> parentOperators;
+ protected List<Operator<? extends OperatorDesc>> childOperators;
+ protected List<Operator<? extends OperatorDesc>> parentOperators;
protected String operatorId;
/**
* List of counter names associated with the operator. It contains the
@@ -122,11 +123,11 @@ public abstract class Operator<T extends
}
public void setChildOperators(
- List<Operator<? extends Serializable>> childOperators) {
+ List<Operator<? extends OperatorDesc>> childOperators) {
this.childOperators = childOperators;
}
- public List<Operator<? extends Serializable>> getChildOperators() {
+ public List<Operator<? extends OperatorDesc>> getChildOperators() {
return childOperators;
}
@@ -140,7 +141,7 @@ public abstract class Operator<T extends
}
ArrayList<Node> ret_vec = new ArrayList<Node>();
- for (Operator<? extends Serializable> op : getChildOperators()) {
+ for (Operator<? extends OperatorDesc> op : getChildOperators()) {
ret_vec.add(op);
}
@@ -148,11 +149,11 @@ public abstract class Operator<T extends
}
public void setParentOperators(
- List<Operator<? extends Serializable>> parentOperators) {
+ List<Operator<? extends OperatorDesc>> parentOperators) {
this.parentOperators = parentOperators;
}
- public List<Operator<? extends Serializable>> getParentOperators() {
+ public List<Operator<? extends OperatorDesc>> getParentOperators() {
return parentOperators;
}
@@ -231,7 +232,7 @@ public abstract class Operator<T extends
return;
}
- for (Operator<? extends Serializable> op : childOperators) {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
op.setReporter(rep);
}
}
@@ -244,7 +245,7 @@ public abstract class Operator<T extends
return;
}
- for (Operator<? extends Serializable> op : childOperators) {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
op.setOutputCollector(out);
}
}
@@ -259,7 +260,7 @@ public abstract class Operator<T extends
return;
}
- for (Operator<? extends Serializable> op : childOperators) {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
op.setAlias(alias);
}
}
@@ -282,7 +283,7 @@ public abstract class Operator<T extends
if (parentOperators == null) {
return true;
}
- for (Operator<? extends Serializable> parent : parentOperators) {
+ for (Operator<? extends OperatorDesc> parent : parentOperators) {
if (parent == null) {
//return true;
continue;
@@ -331,7 +332,7 @@ public abstract class Operator<T extends
}
childOperatorsTag = new int[childOperatorsArray.length];
for (int i = 0; i < childOperatorsArray.length; i++) {
- List<Operator<? extends Serializable>> parentOperators = childOperatorsArray[i]
+ List<Operator<? extends OperatorDesc>> parentOperators = childOperatorsArray[i]
.getParentOperators();
if (parentOperators == null) {
throw new HiveException("Hive internal error: parent is null in "
@@ -361,7 +362,7 @@ public abstract class Operator<T extends
public void initializeLocalWork(Configuration hconf) throws HiveException {
if (childOperators != null) {
for (int i =0; i<childOperators.size();i++) {
- Operator<? extends Serializable> childOp = this.childOperators.get(i);
+ Operator<? extends OperatorDesc> childOp = this.childOperators.get(i);
childOp.initializeLocalWork(hconf);
}
}
@@ -485,7 +486,7 @@ public abstract class Operator<T extends
}
LOG.debug("Starting group for children:");
- for (Operator<? extends Serializable> op : childOperators) {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
op.startGroup();
}
@@ -505,7 +506,7 @@ public abstract class Operator<T extends
}
LOG.debug("Ending group for children:");
- for (Operator<? extends Serializable> op : childOperators) {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
op.endGroup();
}
@@ -514,7 +515,7 @@ public abstract class Operator<T extends
protected boolean allInitializedParentsAreClosed() {
if (parentOperators != null) {
- for (Operator<? extends Serializable> parent : parentOperators) {
+ for (Operator<? extends OperatorDesc> parent : parentOperators) {
if(parent==null){
continue;
}
@@ -562,7 +563,7 @@ public abstract class Operator<T extends
return;
}
- for (Operator<? extends Serializable> op : childOperators) {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
op.close(abort);
}
@@ -595,7 +596,7 @@ public abstract class Operator<T extends
return;
}
- for (Operator<? extends Serializable> op : childOperators) {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
op.jobClose(conf, success, feedBack);
}
}
@@ -604,7 +605,7 @@ public abstract class Operator<T extends
* Cache childOperators in an array for faster access. childOperatorsArray is
* accessed per row, so it's important to make the access efficient.
*/
- protected transient Operator<? extends Serializable>[] childOperatorsArray = null;
+ protected transient Operator<? extends OperatorDesc>[] childOperatorsArray = null;
protected transient int[] childOperatorsTag;
// counters for debugging
@@ -620,14 +621,14 @@ public abstract class Operator<T extends
* @param newChild
* the new child
*/
- public void replaceChild(Operator<? extends Serializable> child,
- Operator<? extends Serializable> newChild) {
+ public void replaceChild(Operator<? extends OperatorDesc> child,
+ Operator<? extends OperatorDesc> newChild) {
int childIndex = childOperators.indexOf(child);
assert childIndex != -1;
childOperators.set(childIndex, newChild);
}
- public void removeChild(Operator<? extends Serializable> child) {
+ public void removeChild(Operator<? extends OperatorDesc> child) {
int childIndex = childOperators.indexOf(child);
assert childIndex != -1;
if (childOperators.size() == 1) {
@@ -651,7 +652,8 @@ public abstract class Operator<T extends
* @param child If this operator is not the only parent of the child. There can be unpredictable result.
* @throws SemanticException
*/
- public void removeChildAndAdoptItsChildren(Operator<? extends Serializable> child) throws SemanticException {
+ public void removeChildAndAdoptItsChildren(
+ Operator<? extends OperatorDesc> child) throws SemanticException {
int childIndex = childOperators.indexOf(child);
if (childIndex == -1) {
throw new SemanticException(
@@ -664,18 +666,18 @@ public abstract class Operator<T extends
childOperators.addAll(childIndex, child.getChildOperators());
}
- for (Operator<? extends Serializable> gc : child.getChildOperators()) {
- List<Operator<? extends Serializable>> parents = gc.getParentOperators();
+ for (Operator<? extends OperatorDesc> gc : child.getChildOperators()) {
+ List<Operator<? extends OperatorDesc>> parents = gc.getParentOperators();
int index = parents.indexOf(child);
if (index == -1) {
throw new SemanticException(
- "Exception when trying to remove partition predicates: fail to find parent from child");
+ "Exception when trying to remove partition predicates: fail to find parent from child");
}
parents.set(index, this);
}
}
- public void removeParent(Operator<? extends Serializable> parent) {
+ public void removeParent(Operator<? extends OperatorDesc> parent) {
int parentIndex = parentOperators.indexOf(parent);
assert parentIndex != -1;
if (parentOperators.size() == 1) {
@@ -702,8 +704,8 @@ public abstract class Operator<T extends
* @param newParent
* the new parent
*/
- public void replaceParent(Operator<? extends Serializable> parent,
- Operator<? extends Serializable> newParent) {
+ public void replaceParent(Operator<? extends OperatorDesc> parent,
+ Operator<? extends OperatorDesc> newParent) {
int parentIndex = parentOperators.indexOf(parent);
assert parentIndex != -1;
parentOperators.set(parentIndex, newParent);
@@ -755,7 +757,7 @@ public abstract class Operator<T extends
int childrenDone = 0;
for (int i = 0; i < childOperatorsArray.length; i++) {
- Operator<? extends Serializable> o = childOperatorsArray[i];
+ Operator<? extends OperatorDesc> o = childOperatorsArray[i];
if (o.getDone()) {
childrenDone++;
} else {
@@ -778,7 +780,7 @@ public abstract class Operator<T extends
public void reset(){
this.state=State.INIT;
if (childOperators != null) {
- for (Operator<? extends Serializable> o : childOperators) {
+ for (Operator<? extends OperatorDesc> o : childOperators) {
o.reset();
}
}
@@ -790,13 +792,13 @@ public abstract class Operator<T extends
*
*/
public static interface OperatorFunc {
- void func(Operator<? extends Serializable> op);
+ void func(Operator<? extends OperatorDesc> op);
}
public void preorderMap(OperatorFunc opFunc) {
opFunc.func(this);
if (childOperators != null) {
- for (Operator<? extends Serializable> o : childOperators) {
+ for (Operator<? extends OperatorDesc> o : childOperators) {
o.preorderMap(opFunc);
}
}
@@ -863,7 +865,7 @@ public abstract class Operator<T extends
if (childOperators != null) {
s.append(ls);
s.append(" <Children>");
- for (Operator<? extends Serializable> o : childOperators) {
+ for (Operator<? extends OperatorDesc> o : childOperators) {
s.append(o.dump(level + 2, seenOpts));
}
s.append(ls);
@@ -873,7 +875,7 @@ public abstract class Operator<T extends
if (parentOperators != null) {
s.append(ls);
s.append(" <Parent>");
- for (Operator<? extends Serializable> o : parentOperators) {
+ for (Operator<? extends OperatorDesc> o : parentOperators) {
s.append("Id = " + o.id + " ");
s.append(o.dump(level, seenOpts));
}
@@ -1154,7 +1156,7 @@ public abstract class Operator<T extends
// but, some operators may be updated more than once and that's ok
if (getChildren() != null) {
for (Node op : getChildren()) {
- ((Operator<? extends Serializable>) op).updateCounters(ctrs);
+ ((Operator<? extends OperatorDesc>) op).updateCounters(ctrs);
}
}
}
@@ -1189,7 +1191,7 @@ public abstract class Operator<T extends
if (getChildren() != null) {
for (Node op : getChildren()) {
- if (((Operator<? extends Serializable>) op).checkFatalErrors(ctrs,
+ if (((Operator<? extends OperatorDesc>) op).checkFatalErrors(ctrs,
errMsg)) {
return true;
}
@@ -1309,7 +1311,7 @@ public abstract class Operator<T extends
this.execContext = execContext;
if(this.childOperators != null) {
for (int i = 0; i<this.childOperators.size();i++) {
- Operator<? extends Serializable> op = this.childOperators.get(i);
+ Operator<? extends OperatorDesc> op = this.childOperators.get(i);
op.setExecContext(execContext);
}
}
@@ -1321,7 +1323,7 @@ public abstract class Operator<T extends
this.cleanUpInputFileChangedOp();
if(this.childOperators != null) {
for (int i = 0; i<this.childOperators.size();i++) {
- Operator<? extends Serializable> op = this.childOperators.get(i);
+ Operator<? extends OperatorDesc> op = this.childOperators.get(i);
op.cleanUpInputFileChanged();
}
}
@@ -1332,4 +1334,25 @@ public abstract class Operator<T extends
public void cleanUpInputFileChangedOp() throws HiveException {
}
+ @Override
+ public Operator<? extends OperatorDesc> clone()
+ throws CloneNotSupportedException {
+
+ List<Operator<? extends OperatorDesc>> parents = getParentOperators();
+ List<Operator<? extends OperatorDesc>> parentClones =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+
+ if (parents != null) {
+ for (Operator<? extends OperatorDesc> parent : parents) {
+ parentClones.add((Operator<? extends OperatorDesc>)(parent.clone()));
+ }
+ }
+
+ T descClone = (T)conf.clone();
+ Operator<? extends OperatorDesc> ret =
+ (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
+ descClone, getSchema(), parentClones);
+
+ return ret;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.La
import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -54,7 +54,7 @@ public final class OperatorFactory {
*
* @param <T>
*/
- public static final class OpTuple<T extends Serializable> {
+ public static final class OpTuple<T extends OperatorDesc> {
public Class<T> descClass;
public Class<? extends Operator<T>> opClass;
@@ -93,7 +93,7 @@ public final class OperatorFactory {
HashTableSinkOperator.class));
}
- public static <T extends Serializable> Operator<T> get(Class<T> opClass) {
+ public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
for (OpTuple o : opvec) {
if (o.descClass == opClass) {
@@ -111,7 +111,7 @@ public final class OperatorFactory {
+ opClass.getName());
}
- public static <T extends Serializable> Operator<T> get(Class<T> opClass,
+ public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass,
RowSchema rwsch) {
Operator<T> ret = get(opClass);
@@ -122,36 +122,46 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of children operators.
*/
- public static <T extends Serializable> Operator<T> get(T conf,
- Operator<? extends Serializable>... oplist) {
+ public static <T extends OperatorDesc> Operator<T> get(T conf,
+ Operator<? extends OperatorDesc>... oplist) {
Operator<T> ret = get((Class<T>) conf.getClass());
ret.setConf(conf);
+ makeChild(ret, oplist);
+ return (ret);
+ }
+
+ /**
+ * Returns an operator given the conf and a list of children operators.
+ */
+ public static void makeChild(
+ Operator<? extends OperatorDesc> ret,
+ Operator<? extends OperatorDesc>... oplist) {
if (oplist.length == 0) {
- return (ret);
+ return;
}
- ArrayList<Operator<? extends Serializable>> clist = new ArrayList<Operator<? extends Serializable>>();
- for (Operator op : oplist) {
+ ArrayList<Operator<? extends OperatorDesc>> clist =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (Operator<? extends OperatorDesc> op : oplist) {
clist.add(op);
}
ret.setChildOperators(clist);
// Add this parent to the children
- for (Operator op : oplist) {
- List<Operator<? extends Serializable>> parents = op.getParentOperators();
+ for (Operator<? extends OperatorDesc> op : oplist) {
+ List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
if (parents == null) {
- parents = new ArrayList<Operator<? extends Serializable>>();
+ parents = new ArrayList<Operator<? extends OperatorDesc>>();
}
parents.add(ret);
op.setParentOperators(parents);
}
- return (ret);
}
/**
* Returns an operator given the conf and a list of children operators.
*/
- public static <T extends Serializable> Operator<T> get(T conf,
+ public static <T extends OperatorDesc> Operator<T> get(T conf,
RowSchema rwsch, Operator... oplist) {
Operator<T> ret = get(conf, oplist);
ret.setSchema(rwsch);
@@ -161,7 +171,7 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
Operator... oplist) {
Operator<T> ret = get((Class<T>) conf.getClass());
ret.setConf(conf);
@@ -180,7 +190,8 @@ public final class OperatorFactory {
}
// add parents for the newly created operator
- List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
+ List<Operator<? extends OperatorDesc>> parent =
+ new ArrayList<Operator<? extends OperatorDesc>>();
for (Operator op : oplist) {
parent.add(op);
}
@@ -193,8 +204,8 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
- List<Operator<? extends Serializable>> oplist) {
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
+ List<Operator<? extends OperatorDesc>> oplist) {
Operator<T> ret = get((Class<T>) conf.getClass());
ret.setConf(conf);
if (oplist.size() == 0) {
@@ -212,7 +223,8 @@ public final class OperatorFactory {
}
// add parents for the newly created operator
- List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
+ List<Operator<? extends OperatorDesc>> parent =
+ new ArrayList<Operator<? extends OperatorDesc>>();
for (Operator op : oplist) {
parent.add(op);
}
@@ -225,7 +237,7 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
RowSchema rwsch, Operator... oplist) {
Operator<T> ret = getAndMakeChild(conf, oplist);
ret.setSchema(rwsch);
@@ -235,8 +247,8 @@ public final class OperatorFactory {
/**
* Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
- RowSchema rwsch, List<Operator<? extends Serializable>> oplist) {
+ public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
+ RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) {
Operator<T> ret = getAndMakeChild(conf, oplist);
ret.setSchema(rwsch);
return (ret);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Wed Aug 29 17:43:59 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.Bu
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -148,9 +149,9 @@ public class SMBMapJoinOperator extends
for (Map.Entry<String, FetchWork> entry : localWork.getAliasToFetchWork()
.entrySet()) {
JobConf jobClone = new JobConf(hconf);
- Operator<? extends Serializable> tableScan = localWork.getAliasToWork()
- .get(entry.getKey());
- if(tableScan instanceof TableScanOperator) {
+ Operator<? extends OperatorDesc> tableScan = localWork.getAliasToWork()
+ .get(entry.getKey());
+ if (tableScan instanceof TableScanOperator) {
ArrayList<Integer> list = ((TableScanOperator)tableScan).getNeededColumnIDs();
if (list != null) {
ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
@@ -165,8 +166,8 @@ public class SMBMapJoinOperator extends
}
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
- .get(entry.getKey());
+ Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
+ .get(entry.getKey());
// All the operators need to be initialized before process
forwardOp.setExecContext(this.getExecContext());
FetchOperator fetchOp = entry.getValue();
@@ -500,7 +501,7 @@ public class SMBMapJoinOperator extends
String tble = this.tagToAlias.get(tag);
FetchOperator fetchOp = fetchOperators.get(tble);
- Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
+ Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
.get(tble);
try {
InspectableObject row = fetchOp.getNextRow();
@@ -565,7 +566,7 @@ public class SMBMapJoinOperator extends
super.closeOp(abort);
if (fetchOperators != null) {
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- Operator<? extends Serializable> forwardOp = localWork
+ Operator<? extends OperatorDesc> forwardOp = localWork
.getAliasToWork().get(entry.getKey());
forwardOp.close(abort);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Wed Aug 29 17:43:59 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -94,11 +94,11 @@ public class SkewJoinHandler {
List<Object> dummyKey = null;
String taskId;
- private final CommonJoinOperator<? extends Serializable> joinOp;
+ private final CommonJoinOperator<? extends OperatorDesc> joinOp;
private final int numAliases;
private final JoinDesc conf;
- public SkewJoinHandler(CommonJoinOperator<? extends Serializable> joinOp) {
+ public SkewJoinHandler(CommonJoinOperator<? extends OperatorDesc> joinOp) {
this.joinOp = joinOp;
numAliases = joinOp.numAliases;
conf = joinOp.getConf();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Wed Aug 29 17:43:59 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.QueryPl
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -350,18 +351,18 @@ public abstract class Task<T extends Ser
return false;
}
- public Collection<Operator<? extends Serializable>> getTopOperators() {
- return new LinkedList<Operator<? extends Serializable>>();
+ public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
+ return new LinkedList<Operator<? extends OperatorDesc>>();
}
-
+
public boolean hasReduce() {
return false;
}
- public Operator<? extends Serializable> getReducer() {
+ public Operator<? extends OperatorDesc> getReducer() {
return null;
}
-
+
public HashMap<String, Long> getCounters() {
return taskCounters;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java Wed Aug 29 17:43:59 2012
@@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
/**
* Terminal Operator Base Class.
**/
-public abstract class TerminalOperator<T extends Serializable> extends
+public abstract class TerminalOperator<T extends OperatorDesc> extends
Operator<T> implements Serializable {
private static final long serialVersionUID = 1L;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Aug 29 17:43:59 2012
@@ -118,8 +118,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -135,8 +135,8 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.index.compact;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -57,14 +56,13 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
public class CompactIndexHandler extends TableBasedIndexHandler {
@@ -252,9 +250,11 @@ public class CompactIndexHandler extends
* @param operators
* @return whether or not it has found its target
*/
- private boolean findIndexColumnFilter(Collection<Operator<? extends Serializable>> operators) {
- for (Operator<? extends Serializable> op : operators) {
- if (op instanceof FilterOperator && ((FilterOperator)op).getConf().getPredicate().getChildren() != null) {
+ private boolean findIndexColumnFilter(
+ Collection<Operator<? extends OperatorDesc>> operators) {
+ for (Operator<? extends OperatorDesc> op : operators) {
+ if (op instanceof FilterOperator &&
+ ((FilterOperator)op).getConf().getPredicate().getChildren() != null) {
// Is this the target
if (findIndexColumnExprNodeDesc(((FilterOperator)op).getConf().getPredicate())) {
((FilterOperator)op).getConf().setSortedFilter(true);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Wed Aug 29 17:43:59 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -41,6 +40,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim;
@@ -224,10 +224,10 @@ public class CombineHiveInputFormat<K ex
// Splits are not shared across different partitions with different input formats.
// For example, 2 partitions (1 sequencefile and 1 rcfile) will have 2 different splits
private static class CombinePathInputFormat {
- private final List<Operator<? extends Serializable>> opList;
+ private final List<Operator<? extends OperatorDesc>> opList;
private final String inputFormatClassName;
- public CombinePathInputFormat(List<Operator<? extends Serializable>> opList,
+ public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList,
String inputFormatClassName) {
this.opList = opList;
this.inputFormatClassName = inputFormatClassName;
@@ -259,7 +259,7 @@ public class CombineHiveInputFormat<K ex
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
init(job);
Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
- Map<String, Operator<? extends Serializable>> aliasToWork =
+ Map<String, Operator<? extends OperatorDesc>> aliasToWork =
mrwork.getAliasToWork();
CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
.getCombineFileInputFormat();
@@ -341,7 +341,7 @@ public class CombineHiveInputFormat<K ex
// Does a pool exist for this path already
CombineFilter f = null;
- List<Operator<? extends Serializable>> opList = null;
+ List<Operator<? extends OperatorDesc>> opList = null;
boolean done = false;
if (!mrwork.isMapperCannotSpanPartns()) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Wed Aug 29 17:43:59 2012
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -31,15 +30,16 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
@@ -385,11 +385,11 @@ public final class HiveFileFormatUtils {
* @param aliasToWork The operator tree to be invoked for a given alias
* @param dir The path to look for
**/
- public static List<Operator<? extends Serializable>> doGetWorksFromPath(
+ public static List<Operator<? extends OperatorDesc>> doGetWorksFromPath(
Map<String, ArrayList<String>> pathToAliases,
- Map<String, Operator<? extends Serializable>> aliasToWork, Path dir) {
- List<Operator<? extends Serializable>> opList =
- new ArrayList<Operator<? extends Serializable>>();
+ Map<String, Operator<? extends OperatorDesc>> aliasToWork, Path dir) {
+ List<Operator<? extends OperatorDesc>> opList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
List<String> aliases = doGetAliasesFromPath(pathToAliases, dir);
for (String alias : aliases) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Wed Aug 29 17:43:59 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -416,8 +416,8 @@ public class HiveInputFormat<K extends W
}
for (String alias : aliases) {
- Operator<? extends Serializable> op = this.mrwork.getAliasToWork().get(
- alias);
+ Operator<? extends OperatorDesc> op = this.mrwork.getAliasToWork().get(
+ alias);
if (op != null && op instanceof TableScanOperator) {
TableScanOperator tableScan = (TableScanOperator) op;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java Wed Aug 29 17:43:59 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.lib;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Stack;
@@ -43,7 +42,7 @@ public class DefaultGraphWalker implemen
/**
* Constructor.
- *
+ *
* @param disp
* dispatcher to call for each op encountered
*/
@@ -68,7 +67,7 @@ public class DefaultGraphWalker implemen
/**
* Dispatch the current operator.
- *
+ *
* @param nd
* node being walked
* @param ndStack
@@ -91,7 +90,7 @@ public class DefaultGraphWalker implemen
/**
* starting point for walking.
- *
+ *
* @throws SemanticException
*/
public void startWalking(Collection<Node> startNodes,
@@ -108,7 +107,7 @@ public class DefaultGraphWalker implemen
/**
* walk the current operator and its descendants.
- *
+ *
* @param nd
* current operator in the graph
* @throws SemanticException
@@ -122,7 +121,7 @@ public class DefaultGraphWalker implemen
|| getDispatchedList().containsAll(nd.getChildren())) {
// all children are done or no need to walk the children
if (!getDispatchedList().contains(nd)) {
- dispatch(nd, opStack);
+ dispatch(nd, opStack);
}
opStack.pop();
return;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed Aug 29 17:43:59 2012
@@ -543,7 +543,7 @@ public class Table implements Serializab
tTable.getSd().getSkewedInfo().setSkewedColNames(skewedColNames);
}
- public List<String> getSkewedColName() {
+ public List<String> getSkewedColNames() {
return tTable.getSd().getSkewedInfo().getSkewedColNames();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.io.IOException;
-import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
/**
@@ -188,7 +188,8 @@ public class BucketMapJoinOptimizer impl
LinkedHashMap<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping =
new LinkedHashMap<String, List<List<String>>>();
- Map<String, Operator<? extends Serializable>> topOps = this.pGraphContext.getTopOps();
+ Map<String, Operator<? extends OperatorDesc>> topOps =
+ this.pGraphContext.getTopOps();
Map<TableScanOperator, Table> topToTable = this.pGraphContext.getTopToTable();
// (partition to bucket file names) and (partition to bucket number) for
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=1378659&r1=1378658&r2=1378659&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Wed Aug 29 17:43:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* Implementation of one of the rule-based optimization steps. ColumnPruner gets
@@ -50,7 +50,7 @@ import org.apache.hadoop.hive.ql.parse.S
*/
public class ColumnPruner implements Transform {
protected ParseContext pGraphContext;
- private HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
+ private HashMap<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtxMap;
/**
* empty constructor.