You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2015/12/09 19:45:18 UTC
hive git commit: HIVE-12595 : [REFACTOR] Make physical compiler more
type safe (Ashutosh Chauhan via Pengcheng Xiong)
Repository: hive
Updated Branches:
refs/heads/master 60a5ff1e2 -> 695bde3ac
HIVE-12595 : [REFACTOR] Make physical compiler more type safe (Ashutosh Chauhan via Pengcheng Xiong)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/695bde3a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/695bde3a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/695bde3a
Branch: refs/heads/master
Commit: 695bde3ac827f8d96e1db9f4410b982891440976
Parents: 60a5ff1
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Fri Dec 4 09:28:37 2015 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Dec 9 10:43:45 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 155 ++++---------------
.../apache/hadoop/hive/ql/exec/Operator.java | 2 +-
.../ql/optimizer/AbstractBucketJoinProc.java | 4 +-
.../hive/ql/optimizer/AbstractSMBJoinProc.java | 6 +-
.../hive/ql/optimizer/GenMRFileSink1.java | 6 +-
.../hive/ql/optimizer/GenMRProcContext.java | 20 +--
.../hive/ql/optimizer/GenMRTableScan1.java | 7 +-
.../hadoop/hive/ql/optimizer/GenMRUnion1.java | 5 +-
.../hive/ql/optimizer/GenMapRedUtils.java | 40 +++--
.../hive/ql/optimizer/GlobalLimitOptimizer.java | 8 +-
.../hive/ql/optimizer/MapJoinFactory.java | 10 +-
.../hive/ql/optimizer/SimpleFetchOptimizer.java | 30 ++--
.../hive/ql/optimizer/SkewJoinOptimizer.java | 6 +-
.../calcite/translator/HiveOpConverter.java | 14 +-
.../translator/HiveOpConverterPostProc.java | 6 +-
.../ql/optimizer/index/RewriteGBUsingIndex.java | 22 ++-
.../RewriteQueryUsingAggregateIndexCtx.java | 13 +-
.../ql/optimizer/lineage/ExprProcFactory.java | 35 ++---
.../spark/SparkSortMergeJoinFactory.java | 6 -
.../hive/ql/parse/ColumnAccessAnalyzer.java | 36 ++---
.../hadoop/hive/ql/parse/GenTezUtils.java | 5 +-
.../hadoop/hive/ql/parse/ParseContext.java | 9 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 22 +--
.../ql/parse/spark/GenSparkProcContext.java | 5 +-
.../hive/ql/parse/spark/GenSparkUtils.java | 4 +-
.../hive/ql/parse/spark/SparkCompiler.java | 3 -
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 2 +-
.../apache/hadoop/hive/ql/plan/PlanUtils.java | 9 +-
.../hadoop/hive/ql/parse/TestGenTezWork.java | 19 +--
29 files changed, 180 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d81e17a..f6af6ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
@@ -70,15 +69,10 @@ import org.apache.hadoop.hive.ql.hooks.PreExecute;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
-import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
-import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
-import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
-import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
-import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -100,10 +94,8 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -754,48 +746,44 @@ public class Driver implements CommandProcessor {
SemanticAnalyzer querySem = (SemanticAnalyzer) sem;
ParseContext parseCtx = querySem.getParseContext();
- for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : querySem
- .getParseContext().getTopOps().entrySet()) {
- Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
- if (topOp instanceof TableScanOperator) {
- TableScanOperator tableScanOp = (TableScanOperator) topOp;
- Table tbl = tableScanOp.getConf().getTableMetadata();
- List<Integer> neededColumnIds = tableScanOp.getNeededColumnIDs();
- List<FieldSchema> columns = tbl.getCols();
- List<String> cols = new ArrayList<String>();
- for (int i = 0; i < neededColumnIds.size(); i++) {
- cols.add(columns.get(neededColumnIds.get(i)).getName());
- }
- //map may not contain all sources, since input list may have been optimized out
- //or non-existent tho such sources may still be referenced by the TableScanOperator
- //if it's null then the partition probably doesn't exist so let's use table permission
- if (tbl.isPartitioned() &&
- Boolean.TRUE.equals(tableUsePartLevelAuth.get(tbl.getTableName()))) {
- String alias_id = topOpMap.getKey();
-
- PrunedPartitionList partsList = PartitionPruner.prune(tableScanOp,
- parseCtx, alias_id);
- Set<Partition> parts = partsList.getPartitions();
- for (Partition part : parts) {
- List<String> existingCols = part2Cols.get(part);
- if (existingCols == null) {
- existingCols = new ArrayList<String>();
- }
- existingCols.addAll(cols);
- part2Cols.put(part, existingCols);
- }
- } else {
- List<String> existingCols = tab2Cols.get(tbl);
+ for (Map.Entry<String, TableScanOperator> topOpMap : querySem.getParseContext().getTopOps().entrySet()) {
+ TableScanOperator topOp = topOpMap.getValue();
+ TableScanOperator tableScanOp = topOp;
+ Table tbl = tableScanOp.getConf().getTableMetadata();
+ List<Integer> neededColumnIds = tableScanOp.getNeededColumnIDs();
+ List<FieldSchema> columns = tbl.getCols();
+ List<String> cols = new ArrayList<String>();
+ for (int i = 0; i < neededColumnIds.size(); i++) {
+ cols.add(columns.get(neededColumnIds.get(i)).getName());
+ }
+ //map may not contain all sources, since input list may have been optimized out
+ //or non-existent tho such sources may still be referenced by the TableScanOperator
+ //if it's null then the partition probably doesn't exist so let's use table permission
+ if (tbl.isPartitioned() &&
+ Boolean.TRUE.equals(tableUsePartLevelAuth.get(tbl.getTableName()))) {
+ String alias_id = topOpMap.getKey();
+
+ PrunedPartitionList partsList = PartitionPruner.prune(tableScanOp,
+ parseCtx, alias_id);
+ Set<Partition> parts = partsList.getPartitions();
+ for (Partition part : parts) {
+ List<String> existingCols = part2Cols.get(part);
if (existingCols == null) {
existingCols = new ArrayList<String>();
}
existingCols.addAll(cols);
- tab2Cols.put(tbl, existingCols);
+ part2Cols.put(part, existingCols);
}
+ } else {
+ List<String> existingCols = tab2Cols.get(tbl);
+ if (existingCols == null) {
+ existingCols = new ArrayList<String>();
+ }
+ existingCols.addAll(cols);
+ tab2Cols.put(tbl, existingCols);
}
}
}
-
}
private static void doAuthorizationV2(SessionState ss, HiveOperation op, HashSet<ReadEntity> inputs,
@@ -893,85 +881,6 @@ public class Driver implements CommandProcessor {
return plan;
}
- /**
- * @param d
- * The database to be locked
- * @param t
- * The table to be locked
- * @param p
- * The partition to be locked
- * @param mode
- * The mode of the lock (SHARED/EXCLUSIVE) Get the list of objects to be locked. If a
- * partition needs to be locked (in any mode), all its parents should also be locked in
- * SHARED mode.
- */
- private List<HiveLockObj> getLockObjects(Database d, Table t, Partition p, HiveLockMode mode)
- throws SemanticException {
- List<HiveLockObj> locks = new LinkedList<HiveLockObj>();
-
- HiveLockObjectData lockData =
- new HiveLockObjectData(plan.getQueryId(),
- String.valueOf(System.currentTimeMillis()),
- "IMPLICIT",
- plan.getQueryStr());
- if (d != null) {
- locks.add(new HiveLockObj(new HiveLockObject(d.getName(), lockData), mode));
- return locks;
- }
-
- if (t != null) {
- locks.add(new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));
- locks.add(new HiveLockObj(new HiveLockObject(t, lockData), mode));
- mode = HiveLockMode.SHARED;
- locks.add(new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));
- return locks;
- }
-
- if (p != null) {
- locks.add(new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));
- if (!(p instanceof DummyPartition)) {
- locks.add(new HiveLockObj(new HiveLockObject(p, lockData), mode));
- }
-
- // All the parents are locked in shared mode
- mode = HiveLockMode.SHARED;
-
- // For dummy partitions, only partition name is needed
- String name = p.getName();
-
- if (p instanceof DummyPartition) {
- name = p.getName().split("@")[2];
- }
-
- String partialName = "";
- String[] partns = name.split("/");
- int len = p instanceof DummyPartition ? partns.length : partns.length - 1;
- Map<String, String> partialSpec = new LinkedHashMap<String, String>();
- for (int idx = 0; idx < len; idx++) {
- String partn = partns[idx];
- partialName += partn;
- String[] nameValue = partn.split("=");
- assert(nameValue.length == 2);
- partialSpec.put(nameValue[0], nameValue[1]);
- try {
- locks.add(new HiveLockObj(
- new HiveLockObject(new DummyPartition(p.getTable(), p.getTable().getDbName()
- + "/" + MetaStoreUtils.encodeTableName(p.getTable().getTableName())
- + "/" + partialName,
- partialSpec), lockData), mode));
- partialName += "/";
- } catch (HiveException e) {
- throw new SemanticException(e.getMessage());
- }
- }
-
- locks.add(new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode));
- locks.add(new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));
- }
-
- return locks;
- }
-
// Write the current set of valid transactions into the conf file so that it can be read by
// the input format.
private void recordValidTxns() throws LockException {
@@ -1304,7 +1213,7 @@ public class Driver implements CommandProcessor {
}
if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) {
//this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics
- //also, indirectly allows DDL to be executed outside a txn context
+ //also, indirectly allows DDL to be executed outside a txn context
startTxnImplicitly = true;
}
if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) {
@@ -1381,7 +1290,7 @@ public class Driver implements CommandProcessor {
releaseLocksAndCommitOrRollback(false, null);
}
catch (LockException e) {
- LOG.error("rollback() FAILED: " + cpr);//make sure not to loose
+ LOG.error("rollback() FAILED: " + cpr);//make sure not to loose
handleHiveException(e, 12, "Additional info in hive.log at \"rollback() FAILED\"");
}
return cpr;
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 9a86a35..85ab6b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -1271,7 +1271,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
return getName() + "[" + getIdentifier() + "]";
}
- public static String toString(Collection<Operator<? extends OperatorDesc>> top) {
+ public static String toString(Collection<TableScanOperator> top) {
StringBuilder builder = new StringBuilder();
Set<String> visited = new HashSet<String>();
for (Operator<?> op : top) {
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
index 7cf0357..a0bc19f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
@@ -194,7 +194,7 @@ abstract public class AbstractBucketJoinProc implements NodeProcessor {
LinkedHashMap<String, List<List<String>>> tblAliasToBucketedFilePathsInEachPartition =
new LinkedHashMap<String, List<List<String>>>();
- HashMap<String, Operator<? extends OperatorDesc>> topOps = pGraphContext.getTopOps();
+ HashMap<String, TableScanOperator> topOps = pGraphContext.getTopOps();
HashMap<String, String> aliasToNewAliasMap = new HashMap<String, String>();
@@ -228,7 +228,7 @@ abstract public class AbstractBucketJoinProc implements NodeProcessor {
// For nested sub-queries, the alias mapping is not maintained in QB currently.
if (topOps.containsValue(tso)) {
- for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry : topOps.entrySet()) {
+ for (Map.Entry<String, TableScanOperator> topOpEntry : topOps.entrySet()) {
if (topOpEntry.getValue() == tso) {
String newAlias = topOpEntry.getKey();
if (!newAlias.equals(alias)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
index 9509f8e..1da0dda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
@@ -291,7 +291,7 @@ abstract public class AbstractSMBJoinProc extends AbstractBucketJoinProc impleme
* The table alias should be subq2:subq1:a which needs to be fetched from topOps.
*/
if (pGraphContext.getTopOps().containsValue(tso)) {
- for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry :
+ for (Map.Entry<String, TableScanOperator> topOpEntry :
this.pGraphContext.getTopOps().entrySet()) {
if (topOpEntry.getValue() == tso) {
alias = topOpEntry.getKey();
@@ -444,13 +444,13 @@ abstract public class AbstractSMBJoinProc extends AbstractBucketJoinProc impleme
String selector = HiveConf.getVar(pGraphContext.getConf(),
HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR);
bigTableMatcherClass =
- (Class<? extends BigTableSelectorForAutoSMJ>) JavaUtils.loadClass(selector);
+ JavaUtils.loadClass(selector);
} catch (ClassNotFoundException e) {
throw new SemanticException(e.getMessage());
}
BigTableSelectorForAutoSMJ bigTableMatcher =
- (BigTableSelectorForAutoSMJ) ReflectionUtils.newInstance(bigTableMatcherClass, null);
+ ReflectionUtils.newInstance(bigTableMatcherClass, null);
JoinDesc joinDesc = joinOp.getConf();
JoinCondDesc[] joinCondns = joinDesc.getConds();
Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
index dcdc9ba..a231543 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.lib.Node;
@@ -61,6 +62,7 @@ public class GenMRFileSink1 implements NodeProcessor {
* @param opProcCtx
* context
*/
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
Object... nodeOutputs) throws SemanticException {
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
@@ -140,7 +142,7 @@ public class GenMRFileSink1 implements NodeProcessor {
private void processLinkedFileDesc(GenMRProcContext ctx,
Task<? extends Serializable> childTask) throws SemanticException {
Task<? extends Serializable> currTask = ctx.getCurrTask();
- Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
+ TableScanOperator currTopOp = ctx.getCurrTopOp();
if (currTopOp != null && !ctx.isSeenOp(currTask, currTopOp)) {
String currAliasId = ctx.getCurrAliasId();
GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
@@ -186,7 +188,7 @@ public class GenMRFileSink1 implements NodeProcessor {
dest = GenMapRedUtils.createMoveTask(ctx.getCurrTask(), chDir, fsOp, ctx.getParseCtx(),
ctx.getMvTask(), ctx.getConf(), ctx.getDependencyTaskForMultiInsert());
- Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
+ TableScanOperator currTopOp = ctx.getCurrTopOp();
String currAliasId = ctx.getCurrAliasId();
HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
ctx.getOpTaskMap();
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
index 0da5790..4387c42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
@@ -94,13 +95,13 @@ public class GenMRProcContext implements NodeProcessorCtx {
final Task<? extends Serializable> uTask;
List<String> taskTmpDir;
List<TableDesc> tt_desc;
- List<Operator<? extends OperatorDesc>> listTopOperators;
+ List<TableScanOperator> listTopOperators;
public GenMRUnionCtx(Task<? extends Serializable> uTask) {
this.uTask = uTask;
taskTmpDir = new ArrayList<String>();
tt_desc = new ArrayList<TableDesc>();
- listTopOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+ listTopOperators = new ArrayList<>();
}
public Task<? extends Serializable> getUTask() {
@@ -123,16 +124,11 @@ public class GenMRProcContext implements NodeProcessorCtx {
return tt_desc;
}
- public List<Operator<? extends OperatorDesc>> getListTopOperators() {
+ public List<TableScanOperator> getListTopOperators() {
return listTopOperators;
}
- public void setListTopOperators(
- List<Operator<? extends OperatorDesc>> listTopOperators) {
- this.listTopOperators = listTopOperators;
- }
-
- public void addListTopOperators(Operator<? extends OperatorDesc> topOperator) {
+ public void addListTopOperators(TableScanOperator topOperator) {
listTopOperators.add(topOperator);
}
}
@@ -152,7 +148,7 @@ public class GenMRProcContext implements NodeProcessorCtx {
private LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx;
private Task<? extends Serializable> currTask;
- private Operator<? extends OperatorDesc> currTopOp;
+ private TableScanOperator currTopOp;
private UnionOperator currUnionOp;
private String currAliasId;
private DependencyCollectionTask dependencyTaskForMultiInsert;
@@ -355,7 +351,7 @@ public class GenMRProcContext implements NodeProcessorCtx {
/**
* @return current top operator
*/
- public Operator<? extends OperatorDesc> getCurrTopOp() {
+ public TableScanOperator getCurrTopOp() {
return currTopOp;
}
@@ -363,7 +359,7 @@ public class GenMRProcContext implements NodeProcessorCtx {
* @param currTopOp
* current top operator
*/
- public void setCurrTopOp(Operator<? extends OperatorDesc> currTopOp) {
+ public void setCurrTopOp(TableScanOperator currTopOp) {
this.currTopOp = currTopOp;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
index af0ac90..2160e01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
@@ -77,9 +77,8 @@ public class GenMRTableScan1 implements NodeProcessor {
// create a dummy MapReduce task
MapredWork currWork = GenMapRedUtils.getMapRedWork(parseCtx);
MapRedTask currTask = (MapRedTask) TaskFactory.get(currWork, parseCtx.getConf());
- Operator<? extends OperatorDesc> currTopOp = op;
ctx.setCurrTask(currTask);
- ctx.setCurrTopOp(currTopOp);
+ ctx.setCurrTopOp(op);
for (String alias : parseCtx.getTopOps().keySet()) {
Operator<? extends OperatorDesc> currOp = parseCtx.getTopOps().get(alias);
@@ -160,9 +159,9 @@ public class GenMRTableScan1 implements NodeProcessor {
Table source = op.getConf().getTableMetadata();
List<String> partCols = GenMapRedUtils.getPartitionColumns(op);
PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, partCols, false);
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx, partList);
+ GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx, partList);
} else { // non-partitioned table
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
+ GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
index d3afdc8..5102d19 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
@@ -77,7 +77,7 @@ public class GenMRUnion1 implements NodeProcessor {
UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
ctx.getMapCurrCtx().put(
- (Operator<? extends OperatorDesc>) union,
+ union,
new GenMapRedCtx(ctx.getCurrTask(),
ctx.getCurrAliasId()));
@@ -170,7 +170,7 @@ public class GenMRUnion1 implements NodeProcessor {
// plan
Task<? extends Serializable> uTask = uCtxTask.getUTask();
ctx.setCurrTask(uTask);
- Operator<? extends OperatorDesc> topOp = ctx.getCurrTopOp();
+ TableScanOperator topOp = ctx.getCurrTopOp();
if (topOp != null && !ctx.isSeenOp(uTask, topOp)) {
GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx
.getCurrTopOp(), uTask, false, ctx);
@@ -189,6 +189,7 @@ public class GenMRUnion1 implements NodeProcessor {
* @param opProcCtx
* context
*/
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
Object... nodeOutputs) throws SemanticException {
UnionOperator union = (UnionOperator) nd;
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 0cd7b62..a1c9651 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -151,7 +151,7 @@ public final class GenMapRedUtils {
MapredWork plan = (MapredWork) currTask.getWork();
HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
opProcCtx.getOpTaskMap();
- Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
+ TableScanOperator currTopOp = opProcCtx.getCurrTopOp();
opTaskMap.put(reducer, currTask);
plan.setReduceWork(new ReduceWork());
@@ -216,7 +216,7 @@ public final class GenMapRedUtils {
private static void setUnionPlan(GenMRProcContext opProcCtx,
boolean local, Task<? extends Serializable> currTask, GenMRUnionCtx uCtx,
boolean mergeTask) throws SemanticException {
- Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
+ TableScanOperator currTopOp = opProcCtx.getCurrTopOp();
if (currTopOp != null) {
String currAliasId = opProcCtx.getCurrAliasId();
@@ -234,7 +234,7 @@ public final class GenMapRedUtils {
int size = taskTmpDirLst.size();
assert local == false;
- List<Operator<? extends OperatorDesc>> topOperators =
+ List<TableScanOperator> topOperators =
uCtx.getListTopOperators();
MapredWork plan = (MapredWork) currTask.getWork();
@@ -332,7 +332,7 @@ public final class GenMapRedUtils {
throws SemanticException {
assert currTask != null && oldTask != null;
- Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
+ TableScanOperator currTopOp = opProcCtx.getCurrTopOp();
List<Task<? extends Serializable>> parTasks = null;
// terminate the old task and make current task dependent on it
if (currTask.getParentTasks() != null
@@ -368,7 +368,7 @@ public final class GenMapRedUtils {
/**
* If currTopOp is not set for input of the task, add input for to the task
*/
- static boolean mergeInput(Operator<? extends OperatorDesc> currTopOp,
+ static boolean mergeInput(TableScanOperator currTopOp,
GenMRProcContext opProcCtx, Task<? extends Serializable> task, boolean local)
throws SemanticException {
if (!opProcCtx.isSeenOp(task, currTopOp)) {
@@ -437,7 +437,7 @@ public final class GenMapRedUtils {
* processing context
*/
public static void setTaskPlan(String alias_id,
- Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
+ TableScanOperator topOp, Task<?> task, boolean local,
GenMRProcContext opProcCtx) throws SemanticException {
setTaskPlan(alias_id, topOp, task, local, opProcCtx, null);
}
@@ -459,7 +459,7 @@ public final class GenMapRedUtils {
* pruned partition list. If it is null it will be computed on-the-fly.
*/
public static void setTaskPlan(String alias_id,
- Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
+ TableScanOperator topOp, Task<?> task, boolean local,
GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
setMapWork(((MapredWork) task.getWork()).getMapWork(), opProcCtx.getParseCtx(),
opProcCtx.getInputs(), pList, topOp, alias_id, opProcCtx.getConf(), local);
@@ -485,7 +485,7 @@ public final class GenMapRedUtils {
* current instance of hive conf
*/
public static void setMapWork(MapWork plan, ParseContext parseCtx, Set<ReadEntity> inputs,
- PrunedPartitionList partsList, Operator<? extends OperatorDesc> topOp, String alias_id,
+ PrunedPartitionList partsList, TableScanOperator tsOp, String alias_id,
HiveConf conf, boolean local) throws SemanticException {
ArrayList<Path> partDir = new ArrayList<Path>();
ArrayList<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
@@ -496,9 +496,8 @@ public final class GenMapRedUtils {
if (partsList == null) {
try {
- TableScanOperator tsOp = (TableScanOperator) topOp;
partsList = PartitionPruner.prune(tsOp, parseCtx, alias_id);
- isAcidTable = ((TableScanOperator) topOp).getConf().isAcidTable();
+ isAcidTable = tsOp.getConf().isAcidTable();
} catch (SemanticException e) {
throw e;
}
@@ -520,11 +519,11 @@ public final class GenMapRedUtils {
// The table does not have any partitions
if (aliasPartnDesc == null) {
- aliasPartnDesc = new PartitionDesc(Utilities.getTableDesc(((TableScanOperator) topOp)
+ aliasPartnDesc = new PartitionDesc(Utilities.getTableDesc(tsOp
.getConf().getTableMetadata()), null);
}
- Map<String, String> props = topOp.getConf().getOpProps();
+ Map<String, String> props = tsOp.getConf().getOpProps();
if (props != null) {
Properties target = aliasPartnDesc.getProperties();
if (target == null) {
@@ -590,10 +589,10 @@ public final class GenMapRedUtils {
// Later the properties have to come from the partition as opposed
// to from the table in order to support versioning.
Path[] paths = null;
- SampleDesc sampleDescr = parseCtx.getOpToSamplePruner().get(topOp);
+ SampleDesc sampleDescr = parseCtx.getOpToSamplePruner().get(tsOp);
// Lookup list bucketing pruner
- Map<String, ExprNodeDesc> partToPruner = parseCtx.getOpToPartToSkewedPruner().get(topOp);
+ Map<String, ExprNodeDesc> partToPruner = parseCtx.getOpToPartToSkewedPruner().get(tsOp);
ExprNodeDesc listBucketingPruner = (partToPruner != null) ? partToPruner.get(part.getName())
: null;
@@ -701,10 +700,7 @@ public final class GenMapRedUtils {
parseCtx.getGlobalLimitCtx().disableOpt();
}
- if (topOp instanceof TableScanOperator) {
- Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(),
- (TableScanOperator) topOp);
- }
+ Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(),tsOp);
Iterator<Path> iterPath = partDir.iterator();
Iterator<PartitionDesc> iterPartnDesc = partDesc.iterator();
@@ -728,7 +724,7 @@ public final class GenMapRedUtils {
}
assert plan.getAliasToWork().get(alias_id) == null;
- plan.getAliasToWork().put(alias_id, topOp);
+ plan.getAliasToWork().put(alias_id, tsOp);
} else {
// populate local work if needed
MapredLocalWork localPlan = plan.getMapRedLocalWork();
@@ -740,7 +736,7 @@ public final class GenMapRedUtils {
assert localPlan.getAliasToWork().get(alias_id) == null;
assert localPlan.getAliasToFetchWork().get(alias_id) == null;
- localPlan.getAliasToWork().put(alias_id, topOp);
+ localPlan.getAliasToWork().put(alias_id, tsOp);
if (tblDir == null) {
tblDesc = Utilities.getTableDesc(partsList.getSourceTable());
localPlan.getAliasToFetchWork().put(
@@ -1275,7 +1271,7 @@ public final class GenMapRedUtils {
// Create a TableScan operator
RowSchema inputRS = fsInput.getSchema();
- Operator<? extends OperatorDesc> tsMerge =
+ TableScanOperator tsMerge =
GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
// Create a FileSink operator
@@ -1539,7 +1535,7 @@ public final class GenMapRedUtils {
* @return the MapredWork
*/
private static MapWork createMRWorkForMergingFiles (HiveConf conf,
- Operator<? extends OperatorDesc> topOp, FileSinkDesc fsDesc) {
+ TableScanOperator topOp, FileSinkDesc fsDesc) {
ArrayList<String> aliases = new ArrayList<String>();
String inputDir = fsDesc.getFinalDirName().toString();
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
index 6b04d92..30976af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
@@ -39,10 +39,8 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.SplitSample;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import com.google.common.collect.ImmutableSet;
@@ -65,11 +63,11 @@ public class GlobalLimitOptimizer implements Transform {
private final Logger LOG = LoggerFactory.getLogger(GlobalLimitOptimizer.class.getName());
+ @Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
Context ctx = pctx.getContext();
- Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
+ Map<String, TableScanOperator> topOps = pctx.getTopOps();
GlobalLimitCtx globalLimitCtx = pctx.getGlobalLimitCtx();
- Map<TableScanOperator, ExprNodeDesc> opToPartPruner = pctx.getOpToPartPruner();
Map<String, SplitSample> nameToSplitSample = pctx.getNameToSplitSample();
// determine the query qualifies reduce input size for LIMIT
@@ -92,7 +90,7 @@ public class GlobalLimitOptimizer implements Transform {
// FROM ... LIMIT...
// SELECT * FROM (SELECT col1 as col2 (SELECT * FROM ...) t1 LIMIT ...) t2);
//
- TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0];
+ TableScanOperator ts = topOps.values().iterator().next();
Integer tempGlobalLimit = checkQbpForGlobalLimit(ts);
// query qualify for the optimization
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
index 647f863..867a1f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
@@ -26,6 +26,7 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -152,7 +153,7 @@ public final class MapJoinFactory {
// The map is overloaded to keep track of mapjoins also
opProcCtx.getOpTaskMap().put(op, currTask);
- Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
+ TableScanOperator currTopOp = opProcCtx.getCurrTopOp();
String currAliasId = opProcCtx.getCurrAliasId();
GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, local, opProcCtx);
}
@@ -170,11 +171,10 @@ public final class MapJoinFactory {
* @param pos
* position of the parent in the stack
*/
- private static void joinMapJoinPlan(AbstractMapJoinOperator<? extends MapJoinDesc> op,
- Task<? extends Serializable> oldTask,
+ private static void joinMapJoinPlan(Task<? extends Serializable> oldTask,
GenMRProcContext opProcCtx, boolean local)
throws SemanticException {
- Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
+ TableScanOperator currTopOp = opProcCtx.getCurrTopOp();
GenMapRedUtils.mergeInput(currTopOp, opProcCtx, oldTask, local);
}
@@ -220,7 +220,7 @@ public final class MapJoinFactory {
} else {
// The current plan can be thrown away after being merged with the
// original plan
- joinMapJoinPlan(mapJoin, oldTask, ctx, local);
+ joinMapJoinPlan(oldTask, ctx, local);
ctx.setCurrTask(currTask = oldTask);
}
MapredWork plan = (MapredWork) currTask.getWork();
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index 9b9a5ca..632a622 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -93,28 +93,26 @@ public class SimpleFetchOptimizer implements Transform {
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
- Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
+ Map<String, TableScanOperator> topOps = pctx.getTopOps();
if (pctx.getQueryProperties().isQuery() && !pctx.getQueryProperties().isAnalyzeCommand()
&& topOps.size() == 1) {
// no join, no groupby, no distinct, no lateral view, no subq,
// no CTAS or insert, not analyze command, and single sourced.
String alias = (String) pctx.getTopOps().keySet().toArray()[0];
- Operator<?> topOp = (Operator<?>) pctx.getTopOps().values().toArray()[0];
- if (topOp instanceof TableScanOperator) {
- try {
- FetchTask fetchTask = optimize(pctx, alias, (TableScanOperator) topOp);
- if (fetchTask != null) {
- pctx.setFetchTask(fetchTask);
- }
- } catch (Exception e) {
- // Has to use full name to make sure it does not conflict with
- // org.apache.commons.lang.StringUtils
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
- if (e instanceof SemanticException) {
- throw (SemanticException) e;
- }
- throw new SemanticException(e.getMessage(), e);
+ TableScanOperator topOp = pctx.getTopOps().values().iterator().next();
+ try {
+ FetchTask fetchTask = optimize(pctx, alias, topOp);
+ if (fetchTask != null) {
+ pctx.setFetchTask(fetchTask);
+ }
+ } catch (Exception e) {
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ if (e instanceof SemanticException) {
+ throw (SemanticException) e;
}
+ throw new SemanticException(e.getMessage(), e);
}
}
return pctx;
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
index 64dc48c..81c1939 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
@@ -190,7 +190,7 @@ public class SkewJoinOptimizer implements Transform {
// Update the topOps appropriately
Map<String, Operator<? extends OperatorDesc>> topOps = getTopOps(joinOpClone);
- Map<String, Operator<? extends OperatorDesc>> origTopOps = parseContext.getTopOps();
+ Map<String, TableScanOperator> origTopOps = parseContext.getTopOps();
for (Entry<String, Operator<? extends OperatorDesc>> topOp : topOps.entrySet()) {
TableScanOperator tso = (TableScanOperator) topOp.getValue();
@@ -283,7 +283,7 @@ public class SkewJoinOptimizer implements Transform {
* @param op The join operator being optimized
* @param tableScanOpsForJoin table scan operators which are parents of the join operator
* @return map<join keys intersection skewedkeys, list of skewed values>.
- * @throws SemanticException
+ * @throws SemanticException
*/
private Map<List<ExprNodeDesc>, List<List<String>>>
getSkewedValues(
@@ -406,7 +406,7 @@ public class SkewJoinOptimizer implements Transform {
return tsOp;
}
}
- if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) ||
+ if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) ||
(op.getParentOperators().size() > 1)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
index 130ee89..00f1acb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
@@ -119,12 +119,12 @@ public class HiveOpConverter {
private final SemanticAnalyzer semanticAnalyzer;
private final HiveConf hiveConf;
private final UnparseTranslator unparseTranslator;
- private final Map<String, Operator<? extends OperatorDesc>> topOps;
+ private final Map<String, TableScanOperator> topOps;
private final boolean strictMode;
private int uniqueCounter;
public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, HiveConf hiveConf,
- UnparseTranslator unparseTranslator, Map<String, Operator<? extends OperatorDesc>> topOps,
+ UnparseTranslator unparseTranslator, Map<String, TableScanOperator> topOps,
boolean strictMode) {
this.semanticAnalyzer = semanticAnalyzer;
this.hiveConf = hiveConf;
@@ -600,7 +600,7 @@ public class HiveOpConverter {
}
ExprNodeDesc[] expressions = new ExprNodeDesc[exchangeRel.getJoinKeys().size()];
for (int index = 0; index < exchangeRel.getJoinKeys().size(); index++) {
- expressions[index] = convertToExprNode((RexNode) exchangeRel.getJoinKeys().get(index),
+ expressions[index] = convertToExprNode(exchangeRel.getJoinKeys().get(index),
exchangeRel.getInput(), inputOpAf.tabAlias, inputOpAf);
}
exchangeRel.setJoinExpressions(expressions);
@@ -943,7 +943,7 @@ public class HiveOpConverter {
int rightPos = joinCondns[i].getRight();
for (ExprNodeDesc expr : filterExpressions.get(i)) {
- // We need to update the exprNode, as currently
+ // We need to update the exprNode, as currently
// they refer to columns in the output of the join;
// they should refer to the columns output by the RS
int inputPos = updateExprNode(expr, reversedExprs, colExprMap);
@@ -956,9 +956,9 @@ public class HiveOpConverter {
joinCondns[i].getType() == JoinDesc.LEFT_OUTER_JOIN ||
joinCondns[i].getType() == JoinDesc.RIGHT_OUTER_JOIN) {
if (inputPos == leftPos) {
- updateFilterMap(filterMap, leftPos, rightPos);
+ updateFilterMap(filterMap, leftPos, rightPos);
} else {
- updateFilterMap(filterMap, rightPos, leftPos);
+ updateFilterMap(filterMap, rightPos, leftPos);
}
}
}
@@ -992,7 +992,7 @@ public class HiveOpConverter {
* This method updates the input expr, changing all the
* ExprNodeColumnDesc in it to refer to columns given by the
* colExprMap.
- *
+ *
* For instance, "col_0 = 1" would become "VALUE.col_0 = 1";
* the execution engine expects filters in the Join operators
* to be expressed that way.
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java
index 1d0a254..a63f167 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java
@@ -25,8 +25,6 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -48,8 +46,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
public class HiveOpConverterPostProc implements Transform {
- private static final Logger LOG = LoggerFactory.getLogger(HiveOpConverterPostProc.class);
-
private ParseContext pctx;
private Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo;
@@ -139,7 +135,7 @@ public class HiveOpConverterPostProc implements Transform {
// 1. Get alias from topOps
String opAlias = null;
- for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry : pctx.getTopOps().entrySet()) {
+ for (Map.Entry<String, TableScanOperator> topOpEntry : pctx.getTopOps().entrySet()) {
if (topOpEntry.getValue() == tableScanOp) {
opAlias = topOpEntry.getKey();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
index ea1ece6..4277be5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.AggregateIndexHandler;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.optimizer.Transform;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
@@ -160,9 +158,9 @@ public class RewriteGBUsingIndex implements Transform {
* if the optimization can be applied. If yes, we add the name of the top table to
* the tsOpToProcess to apply rewrite later on.
* */
- for (Map.Entry<String, Operator<?>> entry : parseContext.getTopOps().entrySet()) {
+ for (Map.Entry<String, TableScanOperator> entry : parseContext.getTopOps().entrySet()) {
String alias = entry.getKey();
- TableScanOperator topOp = (TableScanOperator) entry.getValue();
+ TableScanOperator topOp = entry.getValue();
Table table = topOp.getConf().getTableMetadata();
List<Index> indexes = tableToIndex.get(table);
if (indexes.isEmpty()) {
@@ -230,16 +228,14 @@ public class RewriteGBUsingIndex implements Transform {
supportedIndexes.add(AggregateIndexHandler.class.getName());
// query the metastore to know what columns we have indexed
- Collection<Operator<? extends OperatorDesc>> topTables = parseContext.getTopOps().values();
+ Collection<TableScanOperator> topTables = parseContext.getTopOps().values();
Map<Table, List<Index>> indexes = new HashMap<Table, List<Index>>();
- for (Operator<? extends OperatorDesc> op : topTables) {
- if (op instanceof TableScanOperator) {
- TableScanOperator tsOP = (TableScanOperator) op;
- List<Index> tblIndexes = IndexUtils.getIndexes(tsOP.getConf().getTableMetadata(),
- supportedIndexes);
- if (tblIndexes.size() > 0) {
- indexes.put(tsOP.getConf().getTableMetadata(), tblIndexes);
- }
+ for (TableScanOperator op : topTables) {
+ TableScanOperator tsOP = op;
+ List<Index> tblIndexes = IndexUtils.getIndexes(tsOP.getConf().getTableMetadata(),
+ supportedIndexes);
+ if (tblIndexes.size() > 0) {
+ indexes.put(tsOP.getConf().getTableMetadata(), tblIndexes);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
index d0f28d8..a8ba4d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -85,7 +83,7 @@ public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorC
private final Hive hiveDb;
private final ParseContext parseContext;
- private RewriteCanApplyCtx canApplyCtx;
+ private final RewriteCanApplyCtx canApplyCtx;
//We need the GenericUDAFEvaluator for GenericUDAF function "sum"
private GenericUDAFEvaluator eval = null;
private final String indexTableName;
@@ -148,7 +146,7 @@ public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorC
this.replaceSelectOperatorProcess(selectperator);
}
}
-
+
/**
* This method replaces the original TableScanOperator with the new
* TableScanOperator and metadata that scans over the index table rather than
@@ -161,7 +159,7 @@ public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorC
// Need to remove the original TableScanOperators from these data structures
// and add new ones
- Map<String, Operator<? extends OperatorDesc>> topOps = rewriteQueryCtx.getParseContext()
+ HashMap<String, TableScanOperator> topOps = rewriteQueryCtx.getParseContext()
.getTopOps();
// remove original TableScanOperator
@@ -211,8 +209,7 @@ public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorC
scanOperator.getConf().setAlias(newAlias);
scanOperator.setAlias(indexTableName);
topOps.put(newAlias, scanOperator);
- rewriteQueryCtx.getParseContext().setTopOps(
- (HashMap<String, Operator<? extends OperatorDesc>>) topOps);
+ rewriteQueryCtx.getParseContext().setTopOps(topOps);
ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rs,
Arrays.asList(rewriteQueryCtx.getIndexKey()));
@@ -307,7 +304,7 @@ public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorC
} else {
// we just need to reset the GenericUDAFEvaluator and its name for this
// GroupByOperator whose parent is the ReduceSinkOperator
- GroupByDesc childConf = (GroupByDesc) operator.getConf();
+ GroupByDesc childConf = operator.getConf();
List<AggregationDesc> childAggrList = childConf.getAggregators();
if (childAggrList != null && childAggrList.size() > 0) {
for (AggregationDesc aggregationDesc : childAggrList) {
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
index 38040e3..09ef490 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
@@ -171,27 +171,24 @@ public class ExprProcFactory {
private static boolean findSourceColumn(
LineageCtx lctx, Predicate cond, String tabAlias, String alias) {
- for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap: lctx
- .getParseCtx().getTopOps().entrySet()) {
- Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
- if (topOp instanceof TableScanOperator) {
- TableScanOperator tableScanOp = (TableScanOperator) topOp;
- Table tbl = tableScanOp.getConf().getTableMetadata();
- if (tbl.getTableName().equals(tabAlias)
- || tabAlias.equals(tableScanOp.getConf().getAlias())) {
- for (FieldSchema column: tbl.getCols()) {
- if (column.getName().equals(alias)) {
- TableAliasInfo table = new TableAliasInfo();
- table.setTable(tbl.getTTable());
- table.setAlias(tabAlias);
- BaseColumnInfo colInfo = new BaseColumnInfo();
- colInfo.setColumn(column);
- colInfo.setTabAlias(table);
- cond.getBaseCols().add(colInfo);
- return true;
- }
+ for (Map.Entry<String, TableScanOperator> topOpMap: lctx.getParseCtx().getTopOps().entrySet()) {
+ TableScanOperator tableScanOp = topOpMap.getValue();
+ Table tbl = tableScanOp.getConf().getTableMetadata();
+ if (tbl.getTableName().equals(tabAlias)
+ || tabAlias.equals(tableScanOp.getConf().getAlias())) {
+ for (FieldSchema column: tbl.getCols()) {
+ if (column.getName().equals(alias)) {
+ TableAliasInfo table = new TableAliasInfo();
+ table.setTable(tbl.getTTable());
+ table.setAlias(tabAlias);
+ BaseColumnInfo colInfo = new BaseColumnInfo();
+ colInfo.setColumn(column);
+ colInfo.setTabAlias(table);
+ cond.getBaseCols().add(colInfo);
+ return true;
}
}
+
}
}
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
index aca0630..55fdedb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
@@ -19,21 +19,15 @@ package org.apache.hadoop.hive.ql.optimizer.spark;
import java.util.List;
import java.util.Map;
-import java.util.Stack;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext;
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* Operator factory for Spark SMBJoin processing.
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
index ea58917..dcc8daf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
@@ -18,19 +18,12 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
public class ColumnAccessAnalyzer {
- private static final Logger LOG = LoggerFactory.getLogger(ColumnAccessAnalyzer.class.getName());
private final ParseContext pGraphContext;
public ColumnAccessAnalyzer() {
@@ -43,22 +36,19 @@ public class ColumnAccessAnalyzer {
public ColumnAccessInfo analyzeColumnAccess() throws SemanticException {
ColumnAccessInfo columnAccessInfo = new ColumnAccessInfo();
- Collection<Operator<? extends OperatorDesc>> topOps = pGraphContext.getTopOps().values();
- for (Operator<? extends OperatorDesc> op : topOps) {
- if (op instanceof TableScanOperator) {
- TableScanOperator top = (TableScanOperator) op;
- Table table = top.getConf().getTableMetadata();
- String tableName = table.getCompleteName();
- List<String> referenced = top.getReferencedColumns();
- for (String column : referenced) {
- columnAccessInfo.add(tableName, column);
- }
- if (table.isPartitioned()) {
- PrunedPartitionList parts = pGraphContext.getPrunedPartitions(table.getTableName(), top);
- if (parts.getReferredPartCols() != null) {
- for (String partKey : parts.getReferredPartCols()) {
- columnAccessInfo.add(tableName, partKey);
- }
+ Collection<TableScanOperator> topOps = pGraphContext.getTopOps().values();
+ for (TableScanOperator top : topOps) {
+ Table table = top.getConf().getTableMetadata();
+ String tableName = table.getCompleteName();
+ List<String> referenced = top.getReferencedColumns();
+ for (String column : referenced) {
+ columnAccessInfo.add(tableName, column);
+ }
+ if (table.isPartitioned()) {
+ PrunedPartitionList parts = pGraphContext.getPrunedPartitions(table.getTableName(), top);
+ if (parts.getReferredPartCols() != null) {
+ for (String partKey : parts.getReferredPartCols()) {
+ columnAccessInfo.add(tableName, partKey);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index fe0e234..f656998 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
@@ -179,7 +178,7 @@ public class GenTezUtils {
String alias = ts.getConf().getAlias();
- setupMapWork(mapWork, context, partitions, root, alias);
+ setupMapWork(mapWork, context, partitions, ts, alias);
if (ts.getConf().getTableMetadata() != null && ts.getConf().getTableMetadata().isDummyTable()) {
mapWork.setDummyTableScan(true);
@@ -197,7 +196,7 @@ public class GenTezUtils {
// this method's main use is to help unit testing this class
protected void setupMapWork(MapWork mapWork, GenTezProcContext context,
- PrunedPartitionList partitions, Operator<? extends OperatorDesc> root,
+ PrunedPartitionList partitions, TableScanOperator root,
String alias) throws SemanticException {
// All the setup is done in GenMapRedUtils
GenMapRedUtils.setMapWork(mapWork, context.parseContext,
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 5872e8e..bee0175 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc;
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
/**
@@ -68,7 +67,7 @@ public class ParseContext {
private HashMap<TableScanOperator, PrunedPartitionList> opToPartList;
private HashMap<TableScanOperator, SampleDesc> opToSamplePruner;
private Map<TableScanOperator, Map<String, ExprNodeDesc>> opToPartToSkewedPruner;
- private HashMap<String, Operator<? extends OperatorDesc>> topOps;
+ private HashMap<String, TableScanOperator> topOps;
private Set<JoinOperator> joinOps;
private Set<MapJoinOperator> mapJoinOps;
private Set<SMBMapJoinOperator> smbMapJoinOps;
@@ -150,7 +149,7 @@ public class ParseContext {
HiveConf conf,
HashMap<TableScanOperator, ExprNodeDesc> opToPartPruner,
HashMap<TableScanOperator, PrunedPartitionList> opToPartList,
- HashMap<String, Operator<? extends OperatorDesc>> topOps,
+ HashMap<String, TableScanOperator> topOps,
Set<JoinOperator> joinOps,
Set<SMBMapJoinOperator> smbMapJoinOps,
List<LoadTableDesc> loadTableWork, List<LoadFileDesc> loadFileWork,
@@ -257,7 +256,7 @@ public class ParseContext {
/**
* @return the topOps
*/
- public HashMap<String, Operator<? extends OperatorDesc>> getTopOps() {
+ public HashMap<String, TableScanOperator> getTopOps() {
return topOps;
}
@@ -265,7 +264,7 @@ public class ParseContext {
* @param topOps
* the topOps to set
*/
- public void setTopOps(HashMap<String, Operator<? extends OperatorDesc>> topOps) {
+ public void setTopOps(HashMap<String, TableScanOperator> topOps) {
this.topOps = topOps;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 5803a9c..6ec985d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -240,8 +240,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
private HashMap<TableScanOperator, ExprNodeDesc> opToPartPruner;
private HashMap<TableScanOperator, PrunedPartitionList> opToPartList;
- protected HashMap<String, Operator<? extends OperatorDesc>> topOps;
- private final HashMap<String, Operator<? extends OperatorDesc>> topSelOps;
+ protected HashMap<String, TableScanOperator> topOps;
protected LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx;
private List<LoadTableDesc> loadTableWork;
private List<LoadFileDesc> loadFileWork;
@@ -318,8 +317,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
opToSamplePruner = new HashMap<TableScanOperator, SampleDesc>();
nameToSplitSample = new HashMap<String, SplitSample>();
// Must be deterministic order maps - see HIVE-8707
- topOps = new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
- topSelOps = new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+ topOps = new LinkedHashMap<String, TableScanOperator>();
loadTableWork = new ArrayList<LoadTableDesc>();
loadFileWork = new ArrayList<LoadFileDesc>();
opParseCtx = new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
@@ -357,7 +355,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
loadTableWork.clear();
loadFileWork.clear();
topOps.clear();
- topSelOps.clear();
destTableId = 1;
idToTableNameMap.clear();
qb = null;
@@ -9254,11 +9251,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
RowResolver rwsch;
// is the table already present
- Operator<? extends OperatorDesc> top = topOps.get(alias_id);
- Operator<? extends OperatorDesc> dummySel = topSelOps.get(alias_id);
- if (dummySel != null) {
- top = dummySel;
- }
+ TableScanOperator top = topOps.get(alias_id);
if (top == null) {
// Determine row schema for TSOP.
@@ -9312,7 +9305,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
nameToSplitSample.remove(alias_id);
}
- top = putOpInsertMap(OperatorFactory.get(tsDesc,
+ top = (TableScanOperator) putOpInsertMap(OperatorFactory.get(tsDesc,
new RowSchema(rwsch.getColumnInfos())), rwsch);
// Add this to the list of top operators - we always start from a table
@@ -9320,11 +9313,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
topOps.put(alias_id, top);
// Add a mapping from the table scan operator to Table
- topToTable.put((TableScanOperator) top, tab);
+ topToTable.put(top, tab);
Map<String, String> props = qb.getTabPropsForAlias(alias);
if (props != null) {
- topToTableProps.put((TableScanOperator) top, props);
+ topToTableProps.put(top, props);
tsDesc.setOpProps(props);
}
} else {
@@ -9336,7 +9329,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
Operator<? extends OperatorDesc> op = top;
TableSample ts = qb.getParseInfo().getTabSample(alias);
if (ts != null) {
- TableScanOperator tableScanOp = (TableScanOperator) top;
+ TableScanOperator tableScanOp = top;
tableScanOp.getConf().setTableSample(ts);
int num = ts.getNumerator();
int den = ts.getDenominator();
@@ -10536,6 +10529,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// For example, in Column[a].b.c and Column[a].b, Column[a].b should be
// unparsed before Column[a].b.c
Collections.sort(fieldDescList, new Comparator<Map.Entry<ASTNode, ExprNodeDesc>>() {
+ @Override
public int compare(Entry<ASTNode, ExprNodeDesc> o1, Entry<ASTNode, ExprNodeDesc> o2) {
ExprNodeFieldDesc fieldDescO1 = (ExprNodeFieldDesc) o1.getValue();
ExprNodeFieldDesc fieldDescO2 = (ExprNodeFieldDesc) o2.getValue();
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
index 62237e1..b0ab495 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
@@ -135,7 +136,7 @@ public class GenSparkProcContext implements NodeProcessorCtx {
// Alias to operator map, from the semantic analyzer.
// This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias.
- public final Map<String, Operator<? extends OperatorDesc>> topOps;
+ public final Map<String, TableScanOperator> topOps;
// The set of pruning sinks
public final Set<Operator<?>> pruningSinkSet;
@@ -151,7 +152,7 @@ public class GenSparkProcContext implements NodeProcessorCtx {
List<Task<? extends Serializable>> rootTasks,
Set<ReadEntity> inputs,
Set<WriteEntity> outputs,
- Map<String, Operator<? extends OperatorDesc>> topOps) {
+ Map<String, TableScanOperator> topOps) {
this.conf = conf;
this.parseContext = parseContext;
this.moveTask = moveTask;
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 8dc48cd..924848f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -160,7 +160,7 @@ public class GenSparkUtils {
String alias = ((TableScanOperator) root).getConf().getAlias();
if (!deferSetup) {
- setupMapWork(mapWork, context, partitions, root, alias);
+ setupMapWork(mapWork, context, partitions,(TableScanOperator) root, alias);
}
// add new item to the Spark work
@@ -171,7 +171,7 @@ public class GenSparkUtils {
// this method's main use is to help unit testing this class
protected void setupMapWork(MapWork mapWork, GenSparkProcContext context,
- PrunedPartitionList partitions, Operator<? extends OperatorDesc> root,
+ PrunedPartitionList partitions, TableScanOperator root,
String alias) throws SemanticException {
// All the setup is done in GenMapRedUtils
GenMapRedUtils.setMapWork(mapWork, context.parseContext,
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 7e0e137..3673da4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -26,8 +26,6 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
@@ -97,7 +95,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
public class SparkCompiler extends TaskCompiler {
private static final String CLASS_NAME = SparkCompiler.class.getName();
private static final PerfLogger PERF_LOGGER = SessionState.getPerfLogger();
- private static final Logger LOGGER = LoggerFactory.getLogger(SparkCompiler.class);
public SparkCompiler() {
}
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 73e8f6d..edafdb1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -372,7 +372,7 @@ public class MapWork extends BaseWork {
@Override
@Explain(displayName = "Map Operator Tree", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
- public Set<Operator<?>> getAllRootOperators() {
+ public Set<Operator<? extends OperatorDesc>> getAllRootOperators() {
Set<Operator<?>> opSet = new LinkedHashSet<Operator<?>>();
for (Operator<?> op : getAliasToWork().values()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 6ba122a..5bea6fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -951,7 +951,7 @@ public final class PlanUtils {
throw new RuntimeException(e);
}
}
-
+
public static void addPartitionInputs(Collection<Partition> parts, Collection<ReadEntity> inputs,
ReadEntity parentViewInfo, boolean isDirectRead) {
// Store the inputs in a HashMap since we can't get a ReadEntity from inputs since it is
@@ -990,12 +990,9 @@ public final class PlanUtils {
public static void addInputsForView(ParseContext parseCtx) throws HiveException {
Set<ReadEntity> inputs = parseCtx.getSemanticInputs();
- for (Map.Entry<String, Operator<?>> entry : parseCtx.getTopOps().entrySet()) {
- if (!(entry.getValue() instanceof TableScanOperator)) {
- continue;
- }
+ for (Map.Entry<String, TableScanOperator> entry : parseCtx.getTopOps().entrySet()) {
String alias = entry.getKey();
- TableScanOperator topOp = (TableScanOperator) entry.getValue();
+ TableScanOperator topOp = entry.getValue();
ReadEntity parentViewInfo = getParentViewInfo(alias, parseCtx.getViewAliasToInput());
// Adds tables only for create view (PPD filter can be appended by outer query)
http://git-wip-us.apache.org/repos/asf/hive/blob/695bde3a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
index d9ab9c0..9e5db23 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
@@ -26,8 +26,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -35,12 +33,9 @@ import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.hooks.ReadEntity;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -71,17 +66,17 @@ public class TestGenTezWork {
ctx = new GenTezProcContext(
new HiveConf(),
new ParseContext(),
- (List<Task<MoveWork>>)Collections.EMPTY_LIST,
- (List<Task<? extends Serializable>>) new ArrayList<Task<? extends Serializable>>(),
- (Set<ReadEntity>)Collections.EMPTY_SET,
- (Set<WriteEntity>)Collections.EMPTY_SET);
+ Collections.EMPTY_LIST,
+ new ArrayList<Task<? extends Serializable>>(),
+ Collections.EMPTY_SET,
+ Collections.EMPTY_SET);
proc = new GenTezWork(new GenTezUtils() {
@Override
- protected void setupMapWork(MapWork mapWork, GenTezProcContext context,
- PrunedPartitionList partitions, Operator<? extends OperatorDesc> root, String alias)
+ protected void setupMapWork(MapWork mapWork, GenTezProcContext context,
+ PrunedPartitionList partitions, TableScanOperator root, String alias)
throws SemanticException {
-
+
LinkedHashMap<String, Operator<? extends OperatorDesc>> map
= new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
map.put("foo", root);