You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2015/01/28 07:25:50 UTC
svn commit: r1655226 [2/30] - in /hive/trunk:
contrib/src/test/results/clientpositive/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ ql/src/java...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java Wed Jan 28 06:25:44 2015
@@ -30,13 +30,10 @@ import java.util.Map.Entry;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
* This class implements the processor context for Constant Propagate.
@@ -50,25 +47,18 @@ public class ConstantPropagateProcCtx im
.getLog(ConstantPropagateProcCtx.class);
private final Map<Operator<? extends Serializable>, Map<ColumnInfo, ExprNodeDesc>> opToConstantExprs;
- private final Map<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtx;
private final List<Operator<? extends Serializable>> opToDelete;
- public ConstantPropagateProcCtx(Map<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtx) {
+ public ConstantPropagateProcCtx() {
opToConstantExprs =
new HashMap<Operator<? extends Serializable>, Map<ColumnInfo, ExprNodeDesc>>();
opToDelete = new ArrayList<Operator<? extends Serializable>>();
- this.opToParseCtx = opToParseCtx;
}
public Map<Operator<? extends Serializable>, Map<ColumnInfo, ExprNodeDesc>> getOpToConstantExprs() {
return opToConstantExprs;
}
-
- public Map<Operator<? extends OperatorDesc>, OpParseContext> getOpToParseCtxMap() {
- return opToParseCtx;
- }
-
/**
* Resolve a ColumnInfo based on given RowResolver.
*
@@ -78,27 +68,25 @@ public class ConstantPropagateProcCtx im
* @return
* @throws SemanticException
*/
- private ColumnInfo resolve(ColumnInfo ci, RowResolver rr, RowResolver parentRR)
- throws SemanticException {
+ private ColumnInfo resolve(ColumnInfo ci, RowSchema rs, RowSchema parentRS) {
// Resolve new ColumnInfo from <tableAlias, alias>
String alias = ci.getAlias();
if (alias == null) {
alias = ci.getInternalName();
}
String tblAlias = ci.getTabAlias();
- ColumnInfo rci = rr.get(tblAlias, alias);
- if (rci == null && rr.getRslvMap().size() == 1 && parentRR.getRslvMap().size() == 1) {
- rci = rr.get(null, alias);
+ ColumnInfo rci = rs.getColumnInfo(tblAlias, alias);
+ if (rci == null && rs.getTableNames().size() == 1 &&
+ parentRS.getTableNames().size() == 1) {
+ rci = rs.getColumnInfo(rs.getTableNames().iterator().next(),
+ alias);
}
if (rci == null) {
return null;
}
- String[] tmp = rr.reverseLookup(rci.getInternalName());
- rci.setTabAlias(tmp[0]);
- rci.setAlias(tmp[1]);
LOG.debug("Resolved "
+ ci.getTabAlias() + "." + ci.getAlias() + " as "
- + rci.getTabAlias() + "." + rci.getAlias() + " with rr: " + rr);
+ + rci.getTabAlias() + "." + rci.getAlias() + " with rs: " + rs);
return rci;
}
@@ -117,90 +105,76 @@ public class ConstantPropagateProcCtx im
public Map<ColumnInfo, ExprNodeDesc> getPropagatedConstants(
Operator<? extends Serializable> op) {
Map<ColumnInfo, ExprNodeDesc> constants = new HashMap<ColumnInfo, ExprNodeDesc>();
- OpParseContext parseCtx = opToParseCtx.get(op);
- if (parseCtx == null) {
+ if (op.getSchema() == null) {
return constants;
}
- RowResolver rr = parseCtx.getRowResolver();
- LOG.debug("Getting constants of op:" + op + " with rr:" + rr);
-
- try {
- if (op.getParentOperators() == null) {
- return constants;
- }
+ RowSchema rs = op.getSchema();
+ LOG.debug("Getting constants of op:" + op + " with rs:" + rs);
- if (op instanceof UnionOperator) {
- String alias = (String) rr.getRslvMap().keySet().toArray()[0];
- // find intersection
- Map<ColumnInfo, ExprNodeDesc> intersection = null;
- for (Operator<?> parent : op.getParentOperators()) {
- Map<ColumnInfo, ExprNodeDesc> unionConst = opToConstantExprs.get(parent);
- LOG.debug("Constant of op " + parent.getOperatorId() + " " + unionConst);
- if (intersection == null) {
- intersection = new HashMap<ColumnInfo, ExprNodeDesc>();
- for (Entry<ColumnInfo, ExprNodeDesc> e : unionConst.entrySet()) {
- ColumnInfo ci = new ColumnInfo(e.getKey());
- ci.setTabAlias(alias);
- intersection.put(ci, e.getValue());
- }
- } else {
- Iterator<Entry<ColumnInfo, ExprNodeDesc>> itr = intersection.entrySet().iterator();
- while (itr.hasNext()) {
- Entry<ColumnInfo, ExprNodeDesc> e = itr.next();
- boolean found = false;
- for (Entry<ColumnInfo, ExprNodeDesc> f : opToConstantExprs.get(parent).entrySet()) {
- if (e.getKey().getInternalName().equals(f.getKey().getInternalName())) {
- if (e.getValue().isSame(f.getValue())) {
- found = true;
- }
- break;
+ if (op.getParentOperators() == null) {
+ return constants;
+ }
+
+ if (op instanceof UnionOperator) {
+ String alias = rs.getSignature().get(0).getTabAlias();
+ // find intersection
+ Map<ColumnInfo, ExprNodeDesc> intersection = null;
+ for (Operator<?> parent : op.getParentOperators()) {
+ Map<ColumnInfo, ExprNodeDesc> unionConst = opToConstantExprs.get(parent);
+ LOG.debug("Constant of op " + parent.getOperatorId() + " " + unionConst);
+ if (intersection == null) {
+ intersection = new HashMap<ColumnInfo, ExprNodeDesc>();
+ for (Entry<ColumnInfo, ExprNodeDesc> e : unionConst.entrySet()) {
+ ColumnInfo ci = new ColumnInfo(e.getKey());
+ ci.setTabAlias(alias);
+ intersection.put(ci, e.getValue());
+ }
+ } else {
+ Iterator<Entry<ColumnInfo, ExprNodeDesc>> itr = intersection.entrySet().iterator();
+ while (itr.hasNext()) {
+ Entry<ColumnInfo, ExprNodeDesc> e = itr.next();
+ boolean found = false;
+ for (Entry<ColumnInfo, ExprNodeDesc> f : opToConstantExprs.get(parent).entrySet()) {
+ if (e.getKey().getInternalName().equals(f.getKey().getInternalName())) {
+ if (e.getValue().isSame(f.getValue())) {
+ found = true;
}
- }
- if (!found) {
- itr.remove();
+ break;
}
}
+ if (!found) {
+ itr.remove();
+ }
}
- if (intersection.isEmpty()) {
- return intersection;
- }
}
- LOG.debug("Propagated union constants:" + intersection);
- return intersection;
+ if (intersection.isEmpty()) {
+ return intersection;
+ }
}
+ LOG.debug("Propagated union constants:" + intersection);
+ return intersection;
+ }
- for (Operator<? extends Serializable> parent : op.getParentOperators()) {
- Map<ColumnInfo, ExprNodeDesc> c = opToConstantExprs.get(parent);
- for (Entry<ColumnInfo, ExprNodeDesc> e : c.entrySet()) {
- ColumnInfo ci = e.getKey();
- ColumnInfo rci = null;
- ExprNodeDesc constant = e.getValue();
- rci = resolve(ci, rr, opToParseCtx.get(parent).getRowResolver());
- if (rci != null) {
- constants.put(rci, constant);
- } else {
- LOG.debug("Can't resolve " + ci.getTabAlias() + "." + ci.getAlias() + " from rr:"
- + rr);
- }
-
+ for (Operator<? extends Serializable> parent : op.getParentOperators()) {
+ Map<ColumnInfo, ExprNodeDesc> c = opToConstantExprs.get(parent);
+ for (Entry<ColumnInfo, ExprNodeDesc> e : c.entrySet()) {
+ ColumnInfo ci = e.getKey();
+ ColumnInfo rci = null;
+ ExprNodeDesc constant = e.getValue();
+ rci = resolve(ci, rs, parent.getSchema());
+ if (rci != null) {
+ constants.put(rci, constant);
+ } else {
+ LOG.debug("Can't resolve " + ci.getTabAlias() + "." + ci.getAlias() +
+ "(" + ci.getInternalName() + ") from rs:" + rs);
}
-
}
- LOG.debug("Offerring constants " + constants.keySet()
- + " to operator " + op.toString());
- return constants;
- } catch (SemanticException e) {
- LOG.error(e.getMessage(), e);
- throw new RuntimeException(e);
}
- }
- public RowResolver getRowResolver(Operator<? extends Serializable> op) {
- OpParseContext parseCtx = opToParseCtx.get(op);
- if (parseCtx == null) {
- return null;
- }
- return parseCtx.getRowResolver();
+ LOG.debug("Offerring constants " + constants.keySet()
+ + " to operator " + op.toString());
+
+ return constants;
}
public void addOpToDelete(Operator<? extends Serializable> op) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Wed Jan 28 06:25:44 2015
@@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -108,30 +107,16 @@ public final class ConstantPropagateProc
* @param desc
* @return
*/
- public static ColumnInfo resolveColumn(RowResolver rr,
+ public static ColumnInfo resolveColumn(RowSchema rs,
ExprNodeColumnDesc desc) {
- try {
- ColumnInfo ci = rr.get(desc.getTabAlias(), desc.getColumn());
- if (ci == null) {
- String[] tmp = rr.reverseLookup(desc.getColumn());
- if (tmp == null) {
- return null;
- }
- ci = rr.get(tmp[0], tmp[1]);
- ci.setTabAlias(tmp[0]);
- ci.setAlias(tmp[1]);
- } else {
- String[] tmp = rr.reverseLookup(ci.getInternalName());
- if (tmp == null) {
- return null;
- }
- ci.setTabAlias(tmp[0]);
- ci.setAlias(tmp[1]);
- }
- return ci;
- } catch (SemanticException e) {
- throw new RuntimeException(e);
+ ColumnInfo ci = rs.getColumnInfo(desc.getTabAlias(), desc.getColumn());
+ if (ci == null) {
+ ci = rs.getColumnInfo(desc.getColumn());
}
+ if (ci == null) {
+ return null;
+ }
+ return ci;
}
private static final Set<PrimitiveCategory> unSupportedTypes = ImmutableSet
@@ -254,7 +239,7 @@ public final class ConstantPropagateProc
// expressions are
// constant, add them to colToConstatns as half-deterministic columns.
if (propagate) {
- propagate(udf, newExprs, cppCtx.getRowResolver(op), constants);
+ propagate(udf, newExprs, op.getSchema(), constants);
}
return desc;
@@ -318,7 +303,7 @@ public final class ConstantPropagateProc
* @param op
* @param constants
*/
- private static void propagate(GenericUDF udf, List<ExprNodeDesc> newExprs, RowResolver rr,
+ private static void propagate(GenericUDF udf, List<ExprNodeDesc> newExprs, RowSchema rs,
Map<ColumnInfo, ExprNodeDesc> constants) {
if (udf instanceof GenericUDFOPEqual) {
ExprNodeDesc lOperand = newExprs.get(0);
@@ -341,7 +326,7 @@ public final class ConstantPropagateProc
// we need a column expression on other side.
return;
}
- ColumnInfo ci = resolveColumn(rr, c);
+ ColumnInfo ci = resolveColumn(rs, c);
if (ci != null) {
LOG.debug("Filter " + udf + " is identified as a value assignment, propagate it.");
if (!v.getTypeInfo().equals(ci.getType())) {
@@ -356,7 +341,7 @@ public final class ConstantPropagateProc
if (operand instanceof ExprNodeColumnDesc) {
LOG.debug("Filter " + udf + " is identified as a value assignment, propagate it.");
ExprNodeColumnDesc c = (ExprNodeColumnDesc) operand;
- ColumnInfo ci = resolveColumn(rr, c);
+ ColumnInfo ci = resolveColumn(rs, c);
if (ci != null) {
constants.put(ci, new ExprNodeNullDesc());
}
@@ -435,45 +420,38 @@ public final class ConstantPropagateProc
* @return
*/
private static ExprNodeDesc evaluateColumn(ExprNodeColumnDesc desc,
- ConstantPropagateProcCtx cppCtx, Operator<? extends Serializable> parent) {
- try {
- ColumnInfo ci = null;
- RowResolver rr = cppCtx.getOpToParseCtxMap().get(parent).getRowResolver();
- String[] tmp = rr.reverseLookup(desc.getColumn());
- if (tmp == null) {
- LOG.error("Reverse look up of column " + desc + " error!");
- return null;
- }
- ci = rr.get(tmp[0], tmp[1]);
- if (ci != null) {
- ExprNodeDesc constant = null;
- // Additional work for union operator, see union27.q
- if (ci.getAlias() == null) {
- for (Entry<ColumnInfo, ExprNodeDesc> e : cppCtx.getOpToConstantExprs().get(parent).entrySet()) {
- if (e.getKey().getInternalName().equals(ci.getInternalName())) {
- constant = e.getValue();
- break;
- }
- }
- } else {
- constant = cppCtx.getOpToConstantExprs().get(parent).get(ci);
- }
- if (constant != null) {
- if (constant instanceof ExprNodeConstantDesc
- && !constant.getTypeInfo().equals(desc.getTypeInfo())) {
- return typeCast(constant, desc.getTypeInfo());
- }
- return constant;
- } else {
- return null;
- }
- }
+ ConstantPropagateProcCtx cppCtx, Operator<? extends Serializable> parent) {
+ RowSchema rs = parent.getSchema();
+ ColumnInfo ci = rs.getColumnInfo(desc.getColumn());
+ if (ci == null) {
+ LOG.error("Reverse look up of column " + desc + " error!");
+ ci = rs.getColumnInfo(desc.getTabAlias(), desc.getColumn());
+ }
+ if (ci == null) {
LOG.error("Can't resolve " + desc.getTabAlias() + "." + desc.getColumn());
throw new RuntimeException("Can't resolve " + desc.getTabAlias() + "." + desc.getColumn());
- } catch (SemanticException e) {
- throw new RuntimeException(e);
}
-
+ ExprNodeDesc constant = null;
+ // Additional work for union operator, see union27.q
+ if (ci.getAlias() == null) {
+ for (Entry<ColumnInfo, ExprNodeDesc> e : cppCtx.getOpToConstantExprs().get(parent).entrySet()) {
+ if (e.getKey().getInternalName().equals(ci.getInternalName())) {
+ constant = e.getValue();
+ break;
+ }
+ }
+ } else {
+ constant = cppCtx.getOpToConstantExprs().get(parent).get(ci);
+ }
+ if (constant != null) {
+ if (constant instanceof ExprNodeConstantDesc
+ && !constant.getTypeInfo().equals(desc.getTypeInfo())) {
+ return typeCast(constant, desc.getTypeInfo());
+ }
+ return constant;
+ } else {
+ return null;
+ }
}
/**
@@ -793,11 +771,10 @@ public final class ConstantPropagateProc
// Assume only 1 parent for FS operator
Operator<? extends Serializable> parent = op.getParentOperators().get(0);
Map<ColumnInfo, ExprNodeDesc> parentConstants = cppCtx.getPropagatedConstants(parent);
- RowResolver rr = cppCtx.getOpToParseCtxMap().get(parent).getRowResolver();
+ RowSchema rs = parent.getSchema();
boolean allConstant = true;
for (String input : inputs) {
- String tmp[] = rr.reverseLookup(input);
- ColumnInfo ci = rr.get(tmp[0], tmp[1]);
+ ColumnInfo ci = rs.getColumnInfo(input);
if (parentConstants.get(ci) == null) {
allConstant = false;
break;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Wed Jan 28 06:25:44 2015
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
@@ -70,7 +69,6 @@ public class ConvertJoinMapJoin implemen
static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
- @SuppressWarnings("unchecked")
@Override
/*
* (non-Javadoc) we should ideally not modify the tree we traverse. However,
@@ -172,6 +170,7 @@ public class ConvertJoinMapJoin implemen
return null;
}
+ @SuppressWarnings("unchecked")
private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
// we cannot convert to bucket map join, we cannot convert to
@@ -228,12 +227,11 @@ public class ConvertJoinMapJoin implemen
private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
throws SemanticException {
- ParseContext parseContext = context.parseContext;
MapJoinDesc mapJoinDesc = null;
if (adjustParentsChildren) {
- mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
- joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
- mapJoinConversionPos, true);
+ mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf,
+ joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(),
+ joinOp.getConf().getMapAliases(), mapJoinConversionPos, true);
} else {
JoinDesc joinDesc = joinOp.getConf();
// retain the original join desc in the map join.
@@ -249,7 +247,6 @@ public class ConvertJoinMapJoin implemen
mapJoinDesc.resetOrder();
}
- @SuppressWarnings("unchecked")
CommonMergeJoinOperator mergeJoinOp =
(CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
isSubQuery, mapJoinConversionPos, mapJoinDesc), joinOp.getSchema());
@@ -637,11 +634,10 @@ public class ConvertJoinMapJoin implemen
}
//can safely convert the join to a map join.
- ParseContext parseContext = context.parseContext;
MapJoinOperator mapJoinOp =
- MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
- joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
- bigTablePosition, true);
+ MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp,
+ joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(),
+ joinOp.getConf().getMapAliases(), bigTablePosition, true);
Operator<? extends OperatorDesc> parentBigTableOp =
mapJoinOp.getParentOperators().get(bigTablePosition);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Jan 28 06:25:44 2015
@@ -77,11 +77,9 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPruner;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.QBParseInfo;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
@@ -934,24 +932,6 @@ public final class GenMapRedUtils {
return mrWork;
}
- /**
- * insert in the map for the operator to row resolver.
- *
- * @param op
- * operator created
- * @param rr
- * row resolver
- * @param parseCtx
- * parse context
- */
- @SuppressWarnings("nls")
- public static Operator<? extends OperatorDesc> putOpInsertMap(
- Operator<? extends OperatorDesc> op, RowResolver rr, ParseContext parseCtx) {
- OpParseContext ctx = new OpParseContext(rr);
- parseCtx.getOpParseCtx().put(op, ctx);
- return op;
- }
-
public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSchema) {
TableScanOperator tableScanOp =
(TableScanOperator) OperatorFactory.get(new TableScanDesc(null), rowSchema);
@@ -996,19 +976,16 @@ public final class GenMapRedUtils {
desc.setCompressType(parseCtx.getConf().getVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
}
- Operator<? extends OperatorDesc> fileSinkOp = putOpInsertMap(OperatorFactory
- .get(desc, parent.getSchema()), null, parseCtx);
+ Operator<? extends OperatorDesc> fileSinkOp = OperatorFactory.get(
+ desc, parent.getSchema());
// Connect parent to fileSinkOp
parent.replaceChild(child, fileSinkOp);
fileSinkOp.setParentOperators(Utilities.makeList(parent));
// Create a dummy TableScanOperator for the file generated through fileSinkOp
- RowResolver parentRowResolver =
- parseCtx.getOpParseCtx().get(parent).getRowResolver();
- TableScanOperator tableScanOp = (TableScanOperator) putOpInsertMap(
- createTemporaryTableScanOperator(parent.getSchema()),
- parentRowResolver, parseCtx);
+ TableScanOperator tableScanOp = (TableScanOperator) createTemporaryTableScanOperator(
+ parent.getSchema());
// Connect this TableScanOperator to child.
tableScanOp.setChildOperators(Utilities.makeList(child));
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Jan 28 06:25:44 2015
@@ -29,8 +29,6 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -59,9 +57,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.parse.GenMapRedWalker;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -89,7 +85,6 @@ import org.apache.hadoop.hive.serde2.typ
*/
public class MapJoinProcessor implements Transform {
- private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName());
// mapjoin table descriptor contains a key descriptor which needs the field schema
// (column type + column name). The column name is not really used anywhere, but it
// needs to be passed. Use the string defined below for that.
@@ -98,15 +93,6 @@ public class MapJoinProcessor implements
public MapJoinProcessor() {
}
- @SuppressWarnings("nls")
- private static Operator<? extends OperatorDesc> putOpInsertMap (
- ParseContext pGraphContext, Operator<? extends OperatorDesc> op,
- RowResolver rr) {
- OpParseContext ctx = new OpParseContext(rr);
- pGraphContext.getOpParseCtx().put(op, ctx);
- return op;
- }
-
/**
* Generate the MapRed Local Work for the given map-join operator
*
@@ -224,12 +210,10 @@ public class MapJoinProcessor implements
public static void genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork,
JoinOperator op, int mapJoinPos)
throws SemanticException {
- LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
- newWork.getMapWork().getOpParseCtxMap();
// generate the map join operator; already checked the map join
- MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, opParseCtxMap, op,
- newWork.getMapWork().isLeftInputJoin(), newWork.getMapWork().getBaseSrc(), newWork.getMapWork().getMapAliases(),
- mapJoinPos, true, false);
+ MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, op,
+ newWork.getMapWork().isLeftInputJoin(), newWork.getMapWork().getBaseSrc(),
+ newWork.getMapWork().getMapAliases(), mapJoinPos, true, false);
genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
}
@@ -240,11 +224,9 @@ public class MapJoinProcessor implements
// generate the local work for the big table alias
MapJoinProcessor.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
// clean up the mapred work
- newWork.getMapWork().setOpParseCtxMap(null);
newWork.getMapWork().setLeftInputJoin(false);
newWork.getMapWork().setBaseSrc(null);
newWork.getMapWork().setMapAliases(null);
-
} catch (Exception e) {
e.printStackTrace();
throw new SemanticException("Failed to generate new mapJoin operator " +
@@ -302,7 +284,6 @@ public class MapJoinProcessor implements
* @param validateMapJoinTree
*/
public MapJoinOperator convertMapJoin(HiveConf conf,
- LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) throws SemanticException {
@@ -352,10 +333,9 @@ public class MapJoinProcessor implements
}
// create the map-join operator
- MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
+ MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf,
op, leftInputJoin, baseSrc, mapAliases, mapJoinPos, noCheckOuterJoin);
-
// remove old parents
for (pos = 0; pos < newParentOps.size(); pos++) {
newParentOps.get(pos).replaceChild(oldReduceSinkParentOps.get(pos), mapJoinOp);
@@ -376,22 +356,18 @@ public class MapJoinProcessor implements
}
public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
- LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
MapJoinDesc mapJoinDescriptor =
- getMapJoinDesc(hconf, opParseCtxMap, op, leftInputJoin, baseSrc, mapAliases,
+ getMapJoinDesc(hconf, op, leftInputJoin, baseSrc, mapAliases,
mapJoinPos, noCheckOuterJoin);
// reduce sink row resolver used to generate map join op
- RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
+ RowSchema outputRS = op.getSchema();
MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
- mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), op.getParentOperators());
-
- OpParseContext ctx = new OpParseContext(outputRS);
- opParseCtxMap.put(mapJoinOp, ctx);
+ mapJoinDescriptor, new RowSchema(outputRS.getSignature()), op.getParentOperators());
mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
@@ -434,7 +410,6 @@ public class MapJoinProcessor implements
* @param noCheckOuterJoin
*/
public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf,
- Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
SMBMapJoinOperator smbJoinOp, int bigTablePos, boolean noCheckOuterJoin)
throws SemanticException {
// Create a new map join operator
@@ -451,14 +426,10 @@ public class MapJoinProcessor implements
mapJoinDesc.setStatistics(smbJoinDesc.getStatistics());
- RowResolver joinRS = opParseCtxMap.get(smbJoinOp).getRowResolver();
+ RowSchema joinRS = smbJoinOp.getSchema();
// The mapjoin has the same schema as the join operator
MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
- mapJoinDesc, joinRS.getRowSchema(),
- new ArrayList<Operator<? extends OperatorDesc>>());
-
- OpParseContext ctx = new OpParseContext(joinRS);
- opParseCtxMap.put(mapJoinOp, ctx);
+ mapJoinDesc, joinRS, new ArrayList<Operator<? extends OperatorDesc>>());
// change the children of the original join operator to point to the map
// join operator
@@ -488,11 +459,10 @@ public class MapJoinProcessor implements
HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)
&& HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN);
- LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
- .getOpParseCtx();
- MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op,
- op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(), op.getConf().getMapAliases(),
- mapJoinPos, noCheckOuterJoin, true);
+ MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), op,
+ op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(),
+ op.getConf().getMapAliases(), mapJoinPos, noCheckOuterJoin, true);
+
// create a dummy select to select all columns
genSelectPlan(pctx, mapJoinOp);
return mapJoinOp;
@@ -597,32 +567,33 @@ public class MapJoinProcessor implements
// create a dummy select - This select is needed by the walker to split the
// mapJoin later on
- RowResolver inputRR = pctx.getOpParseCtx().get(input).getRowResolver();
+ RowSchema inputRS = input.getSchema();
ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
ArrayList<String> outputs = new ArrayList<String>();
List<String> outputCols = input.getConf().getOutputColumnNames();
- RowResolver outputRS = new RowResolver();
+ ArrayList<ColumnInfo> outputRS = new ArrayList<ColumnInfo>();
Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
for (int i = 0; i < outputCols.size(); i++) {
String internalName = outputCols.get(i);
- String[] nm = inputRR.reverseLookup(internalName);
- ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]);
+ ColumnInfo valueInfo = inputRS.getColumnInfo(internalName);
ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(), valueInfo
- .getInternalName(), nm[0], valueInfo.getIsVirtualCol());
+ .getInternalName(), valueInfo.getTabAlias(), valueInfo.getIsVirtualCol());
exprs.add(colDesc);
outputs.add(internalName);
- outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo.getType(), nm[0], valueInfo
- .getIsVirtualCol(), valueInfo.isHiddenVirtualCol()));
+ ColumnInfo newCol = new ColumnInfo(internalName, valueInfo.getType(),
+ valueInfo.getTabAlias(), valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol());
+ newCol.setAlias(valueInfo.getAlias());
+ outputRS.add(newCol);
colExprMap.put(internalName, colDesc);
}
SelectDesc select = new SelectDesc(exprs, outputs, false);
- SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select,
- new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+ SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild(select,
+ new RowSchema(outputRS), input);
sel.setColumnExprMap(colExprMap);
@@ -1055,7 +1026,6 @@ public class MapJoinProcessor implements
}
public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
- LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
JoinDesc desc = op.getConf();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Wed Jan 28 06:25:44 2015
@@ -134,7 +134,6 @@ public class NonBlockingOpDeDupProc impl
pSEL.getConf().setSelectStar(cSEL.getConf().isSelectStar());
// We need to use the OpParseContext of the child SelectOperator to replace the
// the OpParseContext of the parent SelectOperator.
- pctx.updateOpParseCtx(pSEL, pctx.removeOpParseCtx(cSEL));
pSEL.removeChildAndAdoptItsChildren(cSEL);
cSEL.setParentOperators(null);
cSEL.setChildOperators(null);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Wed Jan 28 06:25:44 2015
@@ -196,7 +196,9 @@ public class ReduceSinkMapJoinProc imple
LOG.debug("Cloning reduce sink for multi-child broadcast edge");
// we've already set this one up. Need to clone for the next work.
r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
- (ReduceSinkDesc) parentRS.getConf().clone(), parentRS.getParentOperators());
+ (ReduceSinkDesc) parentRS.getConf().clone(),
+ new RowSchema(parentRS.getSchema()),
+ parentRS.getParentOperators());
context.clonedReduceSinks.add(r);
} else {
r = parentRS;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java Wed Jan 28 06:25:44 2015
@@ -67,6 +67,7 @@ public class SimpleFetchAggregation impl
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("R1", GBY + RS + GBY + SEL + FS), new SingleGBYProcessor(pctx));
+ opRules.put(new RuleRegExp("R2", GBY + RS + GBY + FS), new SingleGBYProcessor(pctx));
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -88,12 +89,13 @@ public class SimpleFetchAggregation impl
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
FileSinkOperator FS = (FileSinkOperator) nd;
- GroupByOperator cGBY = (GroupByOperator) stack.get(stack.size() - 3);
- ReduceSinkOperator RS = (ReduceSinkOperator) stack.get(stack.size() - 4);
+ int shift = stack.get(stack.size() - 2) instanceof SelectOperator ? 0 : 1;
+ GroupByOperator cGBY = (GroupByOperator) stack.get(stack.size() - 3 + shift);
+ ReduceSinkOperator RS = (ReduceSinkOperator) stack.get(stack.size() - 4 + shift);
if (RS.getConf().getNumReducers() != 1 || !RS.getConf().getKeyCols().isEmpty()) {
return null;
}
- GroupByOperator pGBY = (GroupByOperator) stack.get(stack.size() - 5);
+ GroupByOperator pGBY = (GroupByOperator) stack.get(stack.size() - 5 + shift);
Path fileName = FS.getConf().getFinalDirName();
TableDesc tsDesc = createIntermediateFS(pGBY, fileName);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Wed Jan 28 06:25:44 2015
@@ -50,7 +50,6 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -223,9 +222,6 @@ public class SkewJoinOptimizer implement
OperatorFactory.getAndMakeChild(
new UnionDesc(), new RowSchema(currOp.getSchema().getSignature()), oplist);
- RowResolver unionRR = parseContext.getOpParseCtx().get(currOp).getRowResolver();
- GenMapRedUtils.putOpInsertMap(unionOp, unionRR, parseContext);
-
// Introduce a select after the union
List<Operator<? extends OperatorDesc>> unionList =
new ArrayList<Operator<? extends OperatorDesc>>();
@@ -235,7 +231,6 @@ public class SkewJoinOptimizer implement
OperatorFactory.getAndMakeChild(
new SelectDesc(true),
new RowSchema(unionOp.getSchema().getSignature()), unionList);
- GenMapRedUtils.putOpInsertMap(selectUnionOp, unionRR, parseContext);
// add the finalOp after the union
selectUnionOp.setChildOperators(finalOps);
@@ -472,12 +467,10 @@ public class SkewJoinOptimizer implement
currChild.setParentOperators(null);
Operator<FilterDesc> filter = OperatorFactory.getAndMakeChild(
- new FilterDesc(filterExpr, false), tableScanOp);
- filter.setSchema(new RowSchema(tableScanOp.getSchema().getSignature()));
+ new FilterDesc(filterExpr, false),
+ new RowSchema(tableScanOp.getSchema().getSignature()),
+ tableScanOp);
OperatorFactory.makeChild(filter, currChild);
-
- RowResolver filterRR = parseContext.getOpParseCtx().get(tableScanOp).getRowResolver();
- GenMapRedUtils.putOpInsertMap(filter, filterRR, parseContext);
}
/**
@@ -604,9 +597,6 @@ public class SkewJoinOptimizer implement
ctx.getCloneTSOpMap().put((TableScanOperator)opClone, (TableScanOperator)op);
}
- GenMapRedUtils.putOpInsertMap(
- opClone, parseContext.getOpParseCtx().get(op).getRowResolver(), parseContext);
-
List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
List<Operator<? extends OperatorDesc>> parentClones = opClone.getParentOperators();
if ((parents != null) && (!parents.isEmpty()) &&
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Wed Jan 28 06:25:44 2015
@@ -18,8 +18,13 @@
package org.apache.hadoop.hive.ql.optimizer;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,10 +53,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -66,14 +68,8 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* When dynamic partitioning (with or without bucketing and sorting) is enabled, this optimization
@@ -196,8 +192,7 @@ public class SortedDynPartitionOptimizer
for (int i : sortPositions) LOG.debug("sort position " + i);
for (int i : sortOrder) LOG.debug("sort order " + i);
List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
- List<ColumnInfo> colInfos = parseCtx.getOpParseCtx().get(fsParent).getRowResolver()
- .getColumnInfos();
+ List<ColumnInfo> colInfos = fsParent.getSchema().getSignature();
ArrayList<ExprNodeDesc> bucketColumns = getPositionsToExprNodes(bucketPositions, colInfos);
// update file sink descriptor
@@ -206,9 +201,7 @@ public class SortedDynPartitionOptimizer
fsOp.getConf().setTotalFiles(1);
// Create ReduceSinkDesc
- RowResolver inputRR = parseCtx.getOpParseCtx().get(fsParent).getRowResolver();
- ObjectPair<String, RowResolver> pair = copyRowResolver(inputRR);
- RowResolver outRR = pair.getSecond();
+ RowSchema outRS = new RowSchema(fsParent.getSchema());
ArrayList<ColumnInfo> valColInfo = Lists.newArrayList(fsParent.getSchema().getSignature());
ArrayList<ExprNodeDesc> newValueCols = Lists.newArrayList();
Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
@@ -220,28 +213,25 @@ public class SortedDynPartitionOptimizer
newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
if (!bucketColumns.isEmpty()) {
- String tableAlias = outRR.getColumnInfos().get(0).getTabAlias();
+ String tableAlias = outRS.getSignature().get(0).getTabAlias();
ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo,
tableAlias, true, true);
- outRR.put(tableAlias, BUCKET_NUMBER_COL_NAME, ci);
+ outRS.getSignature().add(ci);
}
// Create ReduceSink operator
- ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
- OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent),
- outRR, parseCtx);
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+ rsConf, new RowSchema(outRS.getSignature()), fsParent);
rsOp.setColumnExprMap(colExprMap);
// Create ExtractDesc
- ObjectPair<String, RowResolver> exPair = copyRowResolver(outRR);
- RowResolver exRR = exPair.getSecond();
+ RowSchema exRR = new RowSchema(outRS);
ExtractDesc exConf = new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
Utilities.ReduceField.VALUE.toString(), "", false));
// Create Extract Operator
- ExtractOperator exOp = (ExtractOperator) putOpInsertMap(
- OperatorFactory.getAndMakeChild(exConf, new RowSchema(exRR.getColumnInfos()), rsOp),
- exRR, parseCtx);
+ ExtractOperator exOp = (ExtractOperator) OperatorFactory.getAndMakeChild(
+ exConf, exRR, rsOp);
// link EX to FS
fsOp.getParentOperators().clear();
@@ -313,8 +303,6 @@ public class SortedDynPartitionOptimizer
rsParent.getChildOperators().add(rsGrandChild);
rsGrandChild.getParentOperators().clear();
rsGrandChild.getParentOperators().add(rsParent);
- parseCtx.removeOpParseCtx(rsToRemove);
- parseCtx.removeOpParseCtx(rsChild);
LOG.info("Removed " + rsToRemove.getOperatorId() + " and " + rsChild.getOperatorId()
+ " as it was introduced by enforce bucketing/sorting.");
}
@@ -496,31 +484,6 @@ public class SortedDynPartitionOptimizer
return cols;
}
- private Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
- ParseContext context) {
- OpParseContext ctx = new OpParseContext(rr);
- context.getOpParseCtx().put(op, ctx);
- return op;
- }
-
- private ObjectPair<String, RowResolver> copyRowResolver(RowResolver inputRR) {
- ObjectPair<String, RowResolver> output = new ObjectPair<String, RowResolver>();
- RowResolver outRR = new RowResolver();
- int pos = 0;
- String tabAlias = null;
-
- for (ColumnInfo colInfo : inputRR.getColumnInfos()) {
- String[] info = inputRR.reverseLookup(colInfo.getInternalName());
- tabAlias = info[0];
- outRR.put(info[0], info[1], new ColumnInfo(SemanticAnalyzer.getColumnInternalName(pos),
- colInfo.getType(), info[0], colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol()));
- pos++;
- }
- output.setFirst(tabAlias);
- output.setSecond(outRR);
- return output;
- }
-
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java Wed Jan 28 06:25:44 2015
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.LinkedHashMap;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,7 +25,6 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -49,10 +47,9 @@ public class SparkMapJoinProcessor exten
*/
@Override
public MapJoinOperator convertMapJoin(HiveConf conf,
- LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, boolean leftSrc, String[] baseSrc, List<String> mapAliases,
- int bigTablePos, boolean noCheckOuterJoin,
- boolean validateMapJoinTree) throws SemanticException {
+ JoinOperator op, boolean leftSrc, String[] baseSrc, List<String> mapAliases,
+ int bigTablePos, boolean noCheckOuterJoin, boolean validateMapJoinTree)
+ throws SemanticException {
// outer join cannot be performed on a table which is being cached
JoinCondDesc[] condns = op.getConf().getConds();
@@ -64,7 +61,7 @@ public class SparkMapJoinProcessor exten
}
// create the map-join operator
- MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
+ MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf,
op, op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(),
op.getConf().getMapAliases(), bigTablePos, noCheckOuterJoin);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java Wed Jan 28 06:25:44 2015
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Fe
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -113,6 +114,8 @@ public class StatsOptimizer implements T
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("R1", TS + SEL + GBY + RS + GBY + SEL + FS),
new MetaDataProcessor(pctx));
+ opRules.put(new RuleRegExp("R2", TS + SEL + GBY + RS + GBY + FS),
+ new MetaDataProcessor(pctx));
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -208,21 +211,24 @@ public class StatsOptimizer implements T
return null;
}
- selOp = (SelectOperator)rsOp.getChildOperators().get(0).getChildOperators().get(0);
- List<AggregationDesc> aggrs = gbyOp.getConf().getAggregators();
-
- if (!(selOp.getConf().getColList().size() == aggrs.size())) {
- // all select columns must be aggregations
- return null;
-
- }
- for(ExprNodeDesc desc : selOp.getConf().getColList()) {
- if (!(desc instanceof ExprNodeColumnDesc)) {
- // Probably an expression, cant handle that
+ Operator<?> last = rsOp.getChildOperators().get(0);
+ if (last.getChildOperators().get(0) instanceof SelectOperator) {
+ selOp = (SelectOperator)rsOp.getChildOperators().get(0).getChildOperators().get(0);
+ last = selOp;
+ if (!(selOp.getConf().getColList().size() ==
+ gbyOp.getConf().getAggregators().size())) {
+ // all select columns must be aggregations
return null;
+
+ }
+ for(ExprNodeDesc desc : selOp.getConf().getColList()) {
+ if (!(desc instanceof ExprNodeColumnDesc)) {
+ // Probably an expression, cant handle that
+ return null;
+ }
}
}
- FileSinkOperator fsOp = (FileSinkOperator)(selOp.getChildren().get(0));
+ FileSinkOperator fsOp = (FileSinkOperator)(last.getChildren().get(0));
if (fsOp.getChildOperators() != null && fsOp.getChildOperators().size() > 0) {
// looks like a subq plan.
return null;
@@ -234,7 +240,7 @@ public class StatsOptimizer implements T
Hive hive = Hive.get(pctx.getConf());
- for (AggregationDesc aggr : aggrs) {
+ for (AggregationDesc aggr : gbyOp.getConf().getAggregators()) {
if (aggr.getDistinct()) {
// our stats for NDV is approx, not accurate.
return null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Wed Jan 28 06:25:44 2015
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.ContentSumma
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -59,9 +61,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
import org.apache.hadoop.hive.ql.optimizer.Transform;
import org.apache.hadoop.hive.ql.optimizer.physical.CommonJoinTaskDispatcher;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -387,17 +387,16 @@ public class CorrelationOptimizer implem
List<ExprNodeDesc> backtrackedPartitionCols =
ExprNodeDescUtils.backtrack(childPartitionCols, child, current);
- OpParseContext opCtx = pCtx.getOpParseCtx().get(current);
- RowResolver rowResolver = opCtx.getRowResolver();
+ RowSchema rowSchema = current.getSchema();
Set<String> tableNeedToCheck = new HashSet<String>();
for (ExprNodeDesc expr: childKeyCols) {
if (!(expr instanceof ExprNodeColumnDesc)) {
return correlatedReduceSinkOperators;
}
String colName = ((ExprNodeColumnDesc)expr).getColumn();
- String[] nm = rowResolver.reverseLookup(colName);
- if (nm != null) {
- tableNeedToCheck.add(nm[0]);
+ ColumnInfo columnInfo = rowSchema.getColumnInfo(colName);
+ if (columnInfo != null) {
+ tableNeedToCheck.add(columnInfo.getTabAlias());
}
}
if (current instanceof JoinOperator) {
@@ -405,8 +404,7 @@ public class CorrelationOptimizer implem
int expectedNumCorrelatedRsops = current.getParentOperators().size();
LinkedHashSet<ReduceSinkOperator> correlatedRsops = null;
for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
- Set<String> tableNames =
- pCtx.getOpParseCtx().get(parent).getRowResolver().getTableNames();
+ Set<String> tableNames = parent.getSchema().getTableNames();
for (String tbl : tableNames) {
if (tableNeedToCheck.contains(tbl)) {
correlatedRsops = findCorrelatedReduceSinkOperators(current,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java Wed Jan 28 06:25:44 2015
@@ -44,9 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Se
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -347,7 +345,7 @@ public final class CorrelationUtilities
protected static SelectOperator replaceOperatorWithSelect(Operator<?> operator,
ParseContext context, AbstractCorrelationProcCtx procCtx)
throws SemanticException {
- RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver();
+ RowSchema inputRS = operator.getSchema();
SelectDesc select = new SelectDesc(null, null);
Operator<?> parent = getSingleParent(operator);
@@ -355,9 +353,8 @@ public final class CorrelationUtilities
parent.getChildOperators().clear();
- SelectOperator sel = (SelectOperator) putOpInsertMap(
- OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
- .getColumnInfos()), parent), inputRR, context);
+ SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild(
+ select, new RowSchema(inputRS.getSignature()), parent);
sel.setColumnExprMap(operator.getColumnExprMap());
@@ -393,8 +390,6 @@ public final class CorrelationUtilities
}
cGBYr.setColumnExprMap(cGBYm.getColumnExprMap());
cGBYr.setSchema(cGBYm.getSchema());
- RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver();
- context.getOpParseCtx().get(cGBYr).setRowResolver(resolver);
} else {
// pRS-cRS-cGBYr (no map aggregation) --> pRS-cGBYr(COMPLETE)
// revert expressions of cGBYr to that of cRS
@@ -404,25 +399,23 @@ public final class CorrelationUtilities
}
Map<String, ExprNodeDesc> oldMap = cGBYr.getColumnExprMap();
- RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver();
+ RowSchema oldRS = cGBYr.getSchema();
Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
- RowResolver newRR = new RowResolver();
+ ArrayList<ColumnInfo> newRS = new ArrayList<ColumnInfo>();
List<String> outputCols = cGBYr.getConf().getOutputColumnNames();
for (int i = 0; i < outputCols.size(); i++) {
String colName = outputCols.get(i);
- String[] nm = oldRR.reverseLookup(colName);
- ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
- newRR.put(nm[0], nm[1], colInfo);
+ ColumnInfo colInfo = oldRS.getColumnInfo(colName);
+ newRS.add(colInfo);
ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS);
if (colExpr != null) {
newMap.put(colInfo.getInternalName(), colExpr);
}
}
cGBYr.setColumnExprMap(newMap);
- cGBYr.setSchema(new RowSchema(newRR.getColumnInfos()));
- context.getOpParseCtx().get(cGBYr).setRowResolver(newRR);
+ cGBYr.setSchema(new RowSchema(newRS));
}
cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE);
@@ -494,13 +487,5 @@ public final class CorrelationUtilities
}
target.setChildOperators(null);
target.setParentOperators(null);
- context.getOpParseCtx().remove(target);
- }
-
- protected static Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
- ParseContext context) {
- OpParseContext ctx = new OpParseContext(rr);
- context.getOpParseCtx().put(op, ctx);
- return op;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java Wed Jan 28 06:25:44 2015
@@ -254,7 +254,6 @@ public class QueryPlanTreeTransformation
for (ReduceSinkOperator rsop: handledRSs) {
rsop.setChildOperators(null);
rsop.setParentOperators(null);
- pCtx.getOpParseCtx().remove(rsop);
}
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java Wed Jan 28 06:25:44 2015
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.optimizer.Transform;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -332,8 +331,6 @@ public class RewriteGBUsingIndex impleme
RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb, canApplyCtx);
rewriteQueryCtx.invokeRewriteQueryProc();
parseContext = rewriteQueryCtx.getParseContext();
- parseContext.setOpParseCtx((LinkedHashMap<Operator<? extends OperatorDesc>,
- OpParseContext>) rewriteQueryCtx.getOpc());
}
LOG.info("Finished Rewriting query");
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java Wed Jan 28 06:25:44 2015
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.optimi
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,19 +30,17 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -76,7 +73,6 @@ public final class RewriteQueryUsingAggr
this.indexTableName = canApplyCtx.getIndexTableName();
this.alias = canApplyCtx.getAlias();
this.aggregateFunction = canApplyCtx.getAggFunction();
- this.opc = parseContext.getOpParseCtx();
this.indexKey = canApplyCtx.getIndexKey();
}
@@ -86,8 +82,6 @@ public final class RewriteQueryUsingAggr
parseContext, hiveDb, canApplyCtx);
}
- private Map<Operator<? extends OperatorDesc>, OpParseContext> opc =
- new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
private final Hive hiveDb;
private final ParseContext parseContext;
private RewriteCanApplyCtx canApplyCtx;
@@ -99,10 +93,6 @@ public final class RewriteQueryUsingAggr
private ExprNodeColumnDesc aggrExprNode = null;
private String indexKey;
- public Map<Operator<? extends OperatorDesc>, OpParseContext> getOpc() {
- return opc;
- }
-
public ParseContext getParseContext() {
return parseContext;
}
@@ -172,15 +162,9 @@ public final class RewriteQueryUsingAggr
// and add new ones
Map<String, Operator<? extends OperatorDesc>> topOps = rewriteQueryCtx.getParseContext()
.getTopOps();
- Map<Operator<? extends OperatorDesc>, OpParseContext> opParseContext = rewriteQueryCtx
- .getParseContext().getOpParseCtx();
-
- // need this to set rowResolver for new scanOperator
- OpParseContext operatorContext = opParseContext.get(scanOperator);
// remove original TableScanOperator
topOps.remove(alias);
- opParseContext.remove(scanOperator);
String indexTableName = rewriteQueryCtx.getIndexName();
Table indexTableHandle = null;
@@ -201,23 +185,21 @@ public final class RewriteQueryUsingAggr
scanOperator.setConf(indexTableScanDesc);
// Construct the new RowResolver for the new TableScanOperator
- RowResolver rr = new RowResolver();
+ ArrayList<ColumnInfo> sigRS = new ArrayList<ColumnInfo>();
try {
StructObjectInspector rowObjectInspector = (StructObjectInspector) indexTableHandle
.getDeserializer().getObjectInspector();
StructField field = rowObjectInspector.getStructFieldRef(rewriteQueryCtx.getIndexKey());
- rr.put(indexTableName, field.getFieldName(), new ColumnInfo(field.getFieldName(),
- TypeInfoUtils.getTypeInfoFromObjectInspector(field.getFieldObjectInspector()),
- indexTableName, false));
+ sigRS.add(new ColumnInfo(field.getFieldName(), TypeInfoUtils.getTypeInfoFromObjectInspector(
+ field.getFieldObjectInspector()), indexTableName, false));
} catch (SerDeException e) {
LOG.error("Error while creating the RowResolver for new TableScanOperator.");
LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
throw new SemanticException(e.getMessage(), e);
}
+ RowSchema rs = new RowSchema(sigRS);
// Set row resolver for new table
- operatorContext.setRowResolver(rr);
-
String newAlias = indexTableName;
int index = alias.lastIndexOf(":");
if (index >= 0) {
@@ -228,13 +210,10 @@ public final class RewriteQueryUsingAggr
scanOperator.getConf().setAlias(newAlias);
scanOperator.setAlias(indexTableName);
topOps.put(newAlias, scanOperator);
- opParseContext.put(scanOperator, operatorContext);
rewriteQueryCtx.getParseContext().setTopOps(
(HashMap<String, Operator<? extends OperatorDesc>>) topOps);
- rewriteQueryCtx.getParseContext().setOpParseCtx(
- (LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>) opParseContext);
- ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rr,
+ ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rs,
Arrays.asList(rewriteQueryCtx.getIndexKey()));
}
@@ -318,11 +297,6 @@ public final class RewriteQueryUsingAggr
// Now the GroupByOperator has the new AggregationList;
// sum(`_count_of_indexed_key`)
// instead of count(indexed_key)
- OpParseContext gbyOPC = rewriteQueryCtx.getOpc().get(operator);
- RowResolver gbyRR = newDAGContext.getOpParseCtx().get(newGbyOperator).getRowResolver();
- gbyOPC.setRowResolver(gbyRR);
- rewriteQueryCtx.getOpc().put(operator, gbyOPC);
-
oldConf.setAggregators((ArrayList<AggregationDesc>) newAggrList);
operator.setConf(oldConf);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java Wed Jan 28 06:25:44 2015
@@ -19,8 +19,8 @@
package org.apache.hadoop.hive.ql.optimizer.lineage;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
@@ -71,7 +71,7 @@ public class ExprProcCtx implements Node
return inpOp;
}
- public RowResolver getResolver() {
- return lctx.getParseCtx().getOpParseCtx().get(inpOp).getRowResolver();
+ public RowSchema getSchema() {
+ return inpOp.getSchema();
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java Wed Jan 28 06:25:44 2015
@@ -29,6 +29,7 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -76,12 +76,11 @@ public class ExprProcFactory {
Operator<? extends OperatorDesc> operator = epc.getInputOperator();
assert (operator != null);
- RowResolver resolver = epc.getResolver();
- String[] nm = resolver.reverseLookup(cd.getColumn());
- if (nm == null && operator instanceof ReduceSinkOperator) {
- nm = resolver.reverseLookup(Utilities.removeValueTag(cd.getColumn()));
+ RowSchema schema = epc.getSchema();
+ ColumnInfo ci = schema.getColumnInfo(cd.getColumn());
+ if (ci == null && operator instanceof ReduceSinkOperator) {
+ ci = schema.getColumnInfo(Utilities.removeValueTag(cd.getColumn()));
}
- ColumnInfo ci = nm != null ? resolver.get(nm[0], nm[1]): null;
// Insert the dependencies of inp_ci to that of the current operator, ci
LineageCtx lc = epc.getLineageCtx();
@@ -143,6 +142,7 @@ public class ExprProcFactory {
Dependency dep = new Dependency();
dep.setType(LineageInfo.DependencyType.SIMPLE);
dep.setBaseCols(new ArrayList<BaseColumnInfo>());
+
return dep;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java Wed Jan 28 06:25:44 2015
@@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.Utils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -468,17 +467,16 @@ public class OpProcFactory {
ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
}
} else {
- RowResolver resolver = lCtx.getParseCtx().getOpParseCtx().get(rop).getRowResolver();
+ RowSchema schema = rop.getSchema();
ReduceSinkDesc desc = rop.getConf();
List<ExprNodeDesc> keyCols = desc.getKeyCols();
ArrayList<String> keyColNames = desc.getOutputKeyColumnNames();
for (int i = 0; i < keyCols.size(); i++) {
// order-bys, joins
- String[] nm = resolver.reverseLookup(Utilities.ReduceField.KEY + "." + keyColNames.get(i));
- if (nm == null) {
+ ColumnInfo column = schema.getColumnInfo(Utilities.ReduceField.KEY + "." + keyColNames.get(i));
+ if (column == null) {
continue; // key in values
}
- ColumnInfo column = resolver.get(nm[0], nm[1]);
lCtx.getIndex().putDependency(rop, column,
ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i)));
}
@@ -486,12 +484,11 @@ public class OpProcFactory {
ArrayList<String> valColNames = desc.getOutputValueColumnNames();
for (int i = 0; i < valCols.size(); i++) {
// todo: currently, bucketing,etc. makes RS differently with those for order-bys or joins
- String[] nm = resolver.reverseLookup(valColNames.get(i));
- if (nm == null) {
+ ColumnInfo column = schema.getColumnInfo(valColNames.get(i));
+ if (column == null) {
// order-bys, joins
- nm = resolver.reverseLookup(Utilities.ReduceField.VALUE + "." + valColNames.get(i));
+ column = schema.getColumnInfo(Utilities.ReduceField.VALUE + "." + valColNames.get(i));
}
- ColumnInfo column = resolver.get(nm[0], nm[1]);
lCtx.getIndex().putDependency(rop, column,
ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i)));
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Wed Jan 28 06:25:44 2015
@@ -455,7 +455,6 @@ public class CommonJoinTaskDispatcher ex
}
}
- currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
currWork.setLeftInputJoin(joinOp.getConf().isLeftInputJoin());
currWork.setBaseSrc(joinOp.getConf().getBaseSrc());
currWork.setMapAliases(joinOp.getConf().getMapAliases());
@@ -521,7 +520,6 @@ public class CommonJoinTaskDispatcher ex
listWorks.add(currTask.getWork());
listTasks.add(currTask);
// clear JoinTree and OP Parse Context
- currWork.setOpParseCtxMap(null);
currWork.setLeftInputJoin(false);
currWork.setBaseSrc(null);
currWork.setMapAliases(null);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Wed Jan 28 06:25:44 2015
@@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
@@ -151,10 +150,6 @@ public class SortMergeJoinTaskDispatcher
MapredWork currJoinWork = Utilities.clonePlan(currWork);
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
- // Add the row resolver for the new operator
- Map<Operator<? extends OperatorDesc>, OpParseContext> opParseContextMap =
- physicalContext.getParseContext().getOpParseCtx();
- opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
// change the newly created map-red plan as if it was a join operator
genSMBJoinWork(currJoinWork.getMapWork(), newSMBJoinOp);
return currJoinWork;
@@ -253,11 +248,9 @@ public class SortMergeJoinTaskDispatcher
MapredWork currJoinWork = convertSMBWorkToJoinWork(currWork, originalSMBJoinOp);
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
- currWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
currWork.getMapWork().setLeftInputJoin(originalSMBJoinOp.getConf().isLeftInputJoin());
currWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc());
currWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases());
- currJoinWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
currJoinWork.getMapWork().setLeftInputJoin(originalSMBJoinOp.getConf().isLeftInputJoin());
currJoinWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc());
currJoinWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases());
@@ -334,7 +327,6 @@ public class SortMergeJoinTaskDispatcher
listWorks.add(currTask.getWork());
listTasks.add(currTask);
// clear JoinTree and OP Parse Context
- currWork.getMapWork().setOpParseCtxMap(null);
currWork.getMapWork().setLeftInputJoin(false);
currWork.getMapWork().setBaseSrc(null);
currWork.getMapWork().setMapAliases(null);
@@ -432,13 +424,8 @@ public class SortMergeJoinTaskDispatcher
int mapJoinPos) throws SemanticException {
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(task.getWork());
- // Add the row resolver for the new operator
- Map<Operator<? extends OperatorDesc>, OpParseContext> opParseContextMap =
- physicalContext.getParseContext().getOpParseCtx();
- opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
-
// generate the map join operator
- return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(),
- opParseContextMap, newSMBJoinOp, mapJoinPos, true);
+ return MapJoinProcessor.convertSMBJoinToMapJoin(
+ physicalContext.getConf(), newSMBJoinOp, mapJoinPos, true);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Wed Jan 28 06:25:44 2015
@@ -372,11 +372,10 @@ public class SparkMapJoinOptimizer imple
}
//can safely convert the join to a map join.
- ParseContext parseContext = context.getParseContext();
MapJoinOperator mapJoinOp =
- MapJoinProcessor.convertJoinOpMapJoinOp(context.getConf(), parseContext.getOpParseCtx(), joinOp,
- joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
- bigTablePosition, true);
+ MapJoinProcessor.convertJoinOpMapJoinOp(context.getConf(), joinOp,
+ joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(),
+ joinOp.getConf().getMapAliases(), bigTablePosition, true);
Operator<? extends OperatorDesc> parentBigTableOp =
mapJoinOp.getParentOperators().get(bigTablePosition);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Wed Jan 28 06:25:44 2015
@@ -18,6 +18,13 @@
package org.apache.hadoop.hive.ql.parse;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -28,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -43,13 +51,6 @@ import org.apache.hadoop.hive.ql.plan.Te
import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Stack;
-
/**
* GenTezWork separates the operator tree into tez tasks.
* It is called once per leaf operator (operator that forces
@@ -245,7 +246,9 @@ public class GenTezWork implements NodeP
LOG.debug("Cloning reduce sink for multi-child broadcast edge");
// we've already set this one up. Need to clone for the next work.
r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
- (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
+ (ReduceSinkDesc)r.getConf().clone(),
+ new RowSchema(r.getSchema()),
+ r.getParentOperators());
context.clonedReduceSinks.add(r);
}
r.getConf().setOutputName(work.getName());