You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/11/11 02:50:18 UTC
svn commit: r712905 [5/38] - in /hadoop/core/trunk: ./ src/contrib/hive/
src/contrib/hive/cli/src/java/org/apache/hadoop/hive/cli/
src/contrib/hive/common/src/java/org/apache/hadoop/hive/conf/
src/contrib/hive/conf/ src/contrib/hive/data/files/ src/con...
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Nov 10 17:50:06 2008
@@ -24,6 +24,7 @@
import java.lang.reflect.Method;
import org.antlr.runtime.tree.*;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
@@ -33,7 +34,9 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.*;
+import org.apache.hadoop.hive.ql.optimizer.Optimizer;
import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.ql.typeinfo.TypeInfo;
import org.apache.hadoop.hive.ql.typeinfo.TypeInfoFactory;
@@ -41,6 +44,7 @@
import org.apache.hadoop.hive.ql.udf.UDFOPPositive;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.fs.Path;
@@ -54,8 +58,12 @@
private HashMap<String, PartitionPruner> aliasToPruner;
private HashMap<String, SamplePruner> aliasToSamplePruner;
private HashMap<String, Operator<? extends Serializable>> topOps;
+ private HashMap<String, Operator<? extends Serializable>> topSelOps;
+ private HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx;
private List<loadTableDesc> loadTableWork;
private List<loadFileDesc> loadFileWork;
+ private QB qb;
+ private CommonTree ast;
private static class Phase1Ctx {
String dest;
@@ -69,21 +77,42 @@
this.aliasToPruner = new HashMap<String, PartitionPruner>();
this.aliasToSamplePruner = new HashMap<String, SamplePruner>();
this.topOps = new HashMap<String, Operator<? extends Serializable>>();
+ this.topSelOps = new HashMap<String, Operator<? extends Serializable>>();
this.loadTableWork = new ArrayList<loadTableDesc>();
this.loadFileWork = new ArrayList<loadFileDesc>();
+ opParseCtx = new HashMap<Operator<? extends Serializable>, OpParseContext>();
}
@Override
protected void reset() {
super.reset();
this.aliasToPruner.clear();
- this.topOps.clear();
this.loadTableWork.clear();
this.loadFileWork.clear();
+ this.topOps.clear();
+ this.topSelOps.clear();
+ qb = null;
+ ast = null;
+ }
+
+ public void init(ParseContext pctx) {
+ aliasToPruner = pctx.getAliasToPruner();
+ aliasToSamplePruner = pctx.getAliasToSamplePruner();
+ topOps = pctx.getTopOps();
+ topSelOps = pctx.getTopSelOps();
+ opParseCtx = pctx.getOpParseCtx();
+ loadTableWork = pctx.getLoadTableWork();
+ loadFileWork = pctx.getLoadFileWork();
+ ctx = pctx.getContext();
+ }
+
+ public ParseContext getParseContext() {
+ return new ParseContext(conf, qb, ast, aliasToPruner, aliasToSamplePruner, topOps,
+ topSelOps, opParseCtx, loadTableWork, loadFileWork, ctx);
}
-
+
@SuppressWarnings("nls")
- private void doPhase1QBExpr(CommonTree ast, QBExpr qbexpr, String id,
+ public void doPhase1QBExpr(CommonTree ast, QBExpr qbexpr, String id,
String alias) throws SemanticException {
assert (ast.getToken() != null);
@@ -142,7 +171,7 @@
|| expressionTree.getToken().getType() == HiveParser.TOK_FUNCTIONDI) {
assert (expressionTree.getChildCount() != 0);
assert (expressionTree.getChild(0).getType() == HiveParser.Identifier);
- String functionName = expressionTree.getChild(0).getText();
+ String functionName = unescapeIdentifier(expressionTree.getChild(0).getText());
if (FunctionRegistry.getUDAF(functionName) != null) {
aggregations.put(expressionTree.toStringTree(), expressionTree);
return;
@@ -195,17 +224,17 @@
tableSamplePresent = true;
}
CommonTree tableTree = (CommonTree)(tabref.getChild(0));
- String alias = tabref.getChild(aliasIndex).getText();
+ String alias = unescapeIdentifier(tabref.getChild(aliasIndex).getText());
// If the alias is already there then we have a conflict
if (qb.exists(alias)) {
throw new SemanticException(ErrorMsg.AMBIGOUS_TABLE_ALIAS.getMsg(tabref.getChild(aliasIndex)));
}
if (tableSamplePresent) {
CommonTree sampleClause = (CommonTree)tabref.getChild(1);
- ArrayList<String> sampleCols = new ArrayList<String>();
+ ArrayList<CommonTree> sampleCols = new ArrayList<CommonTree>();
if (sampleClause.getChildCount() > 2) {
for (int i = 2; i < sampleClause.getChildCount(); i++) {
- sampleCols.add(sampleClause.getChild(i).getText());
+ sampleCols.add((CommonTree)sampleClause.getChild(i));
}
}
// TODO: For now only support sampling on up to two columns
@@ -214,13 +243,13 @@
throw new SemanticException(ErrorMsg.SAMPLE_RESTRICTION.getMsg(tabref.getChild(0)));
}
qb.getParseInfo().setTabSample(alias, new TableSample(
- sampleClause.getChild(0).getText(),
- sampleClause.getChild(1).getText(),
- sampleCols)
+ unescapeIdentifier(sampleClause.getChild(0).getText()),
+ unescapeIdentifier(sampleClause.getChild(1).getText()),
+ sampleCols)
);
}
// Insert this map into the stats
- String table_name = tabref.getChild(0).getText();
+ String table_name = unescapeIdentifier(tabref.getChild(0).getText());
qb.setTabAlias(alias, table_name);
qb.getParseInfo().setSrcForAlias(alias, tableTree);
@@ -233,7 +262,7 @@
throw new SemanticException(ErrorMsg.NO_SUBQUERY_ALIAS.getMsg(subq));
}
CommonTree subqref = (CommonTree) subq.getChild(0);
- String alias = subq.getChild(1).getText();
+ String alias = unescapeIdentifier(subq.getChild(1).getText());
// Recursively do the first phase of semantic analysis for the subquery
QBExpr qbexpr = new QBExpr(alias);
@@ -277,7 +306,7 @@
}
@SuppressWarnings({"fallthrough", "nls"})
- private void doPhase1(CommonTree ast, QB qb, Phase1Ctx ctx_1)
+ public void doPhase1(CommonTree ast, QB qb, Phase1Ctx ctx_1)
throws SemanticException {
QBParseInfo qbp = qb.getParseInfo();
@@ -379,8 +408,20 @@
}
}
+ /**
+ * Generate partition pruners. The filters can occur in the where clause and in the JOIN conditions. First, walk over the
+ * filters in the join condition and AND them, since all of them are needed. Then for each where clause, traverse the
+ * filter.
+ * Note that, currently we do not propagate filters over subqueries. For eg: if the query is of the type:
+ * select ... FROM t1 JOIN (select ... t2) x where x.partition
+ * we will not recognize that x.partition condition introduces a parition pruner on t2
+ *
+ */
@SuppressWarnings("nls")
private void genPartitionPruners(QB qb) throws SemanticException {
+ Map<String, Boolean> joinPartnPruner = new HashMap<String, Boolean>();
+ QBParseInfo qbp = qb.getParseInfo();
+
// Recursively prune subqueries
for (String alias : qb.getSubqAliases()) {
QBExpr qbexpr = qb.getSubqForAlias(alias);
@@ -389,21 +430,12 @@
for (String alias : qb.getTabAliases()) {
String alias_id = (qb.getId() == null ? alias : qb.getId() + ":" + alias);
- PartitionPruner pruner = new PartitionPruner(alias,
- qb.getMetaData());
+
+ PartitionPruner pruner = new PartitionPruner(alias, qb.getMetaData());
// Pass each where clause to the pruner
- QBParseInfo qbp = qb.getParseInfo();
for(String clause: qbp.getClauseNames()) {
CommonTree whexp = (CommonTree)qbp.getWhrForClause(clause);
-
- if (pruner.getTable().isPartitioned() &&
- conf.getVar(HiveConf.ConfVars.HIVEPARTITIONPRUNER).equalsIgnoreCase("strict") &&
- (whexp == null || !pruner.hasPartitionPredicate((CommonTree)whexp.getChild(0)))) {
- throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE.getMsg(whexp != null ? whexp : qbp.getSelForClause(clause),
- " for Alias " + alias + " Table " + pruner.getTable().getName()));
- }
-
if (whexp != null) {
pruner.addExpression((CommonTree)whexp.getChild(0));
}
@@ -412,6 +444,54 @@
// Add the pruner to the list
this.aliasToPruner.put(alias_id, pruner);
}
+
+ if (!qb.getTabAliases().isEmpty() && qb.getQbJoinTree() != null) {
+ int pos = 0;
+ for (String alias : qb.getQbJoinTree().getBaseSrc()) {
+ if (alias != null) {
+ String alias_id = (qb.getId() == null ? alias : qb.getId() + ":" + alias);
+ PartitionPruner pruner = this.aliasToPruner.get(alias_id);
+ if(pruner == null) {
+ // this means that the alias is a subquery
+ pos++;
+ continue;
+ }
+ Vector<CommonTree> filters = qb.getQbJoinTree().getFilters().get(pos);
+ for (CommonTree cond : filters) {
+ pruner.addJoinOnExpression(cond);
+ if (pruner.hasPartitionPredicate(cond))
+ joinPartnPruner.put(alias_id, new Boolean(true));
+ }
+ if (qb.getQbJoinTree().getJoinSrc() != null) {
+ filters = qb.getQbJoinTree().getFilters().get(0);
+ for (CommonTree cond : filters) {
+ pruner.addJoinOnExpression(cond);
+ if (pruner.hasPartitionPredicate(cond))
+ joinPartnPruner.put(alias_id, new Boolean(true));
+ }
+ }
+ }
+ pos++;
+ }
+ }
+
+ for (String alias : qb.getTabAliases()) {
+ String alias_id = (qb.getId() == null ? alias : qb.getId() + ":" + alias);
+ PartitionPruner pruner = this.aliasToPruner.get(alias_id);
+ if (joinPartnPruner.get(alias_id) == null) {
+ // Pass each where clause to the pruner
+ for(String clause: qbp.getClauseNames()) {
+
+ CommonTree whexp = (CommonTree)qbp.getWhrForClause(clause);
+ if (pruner.getTable().isPartitioned() &&
+ conf.getVar(HiveConf.ConfVars.HIVEPARTITIONPRUNER).equalsIgnoreCase("strict") &&
+ (whexp == null || !pruner.hasPartitionPredicate((CommonTree)whexp.getChild(0)))) {
+ throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE.getMsg(whexp != null ? whexp : qbp.getSelForClause(clause),
+ " for Alias " + alias + " Table " + pruner.getTable().getName()));
+ }
+ }
+ }
+ }
}
private void genSamplePruners(QBExpr qbexpr) throws SemanticException {
@@ -451,7 +531,7 @@
}
@SuppressWarnings("nls")
- private void getMetaData(QB qb) throws SemanticException {
+ public void getMetaData(QB qb) throws SemanticException {
try {
LOG.info("Get metadata for source tables");
@@ -572,7 +652,7 @@
// String[] allAliases = joinTree.getAllAliases();
switch (condn.getToken().getType()) {
case HiveParser.TOK_COLREF:
- String tblName = condn.getChild(0).getText();
+ String tblName = unescapeIdentifier(condn.getChild(0).getText().toLowerCase());
if (isPresent(joinTree.getLeftAliases(), tblName)) {
if (!leftAliases.contains(tblName))
leftAliases.add(tblName);
@@ -632,62 +712,169 @@
throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_2.getMsg(condn));
}
- private void parseJoinCondition(CommonTree joinParseTree,
- QBJoinTree joinTree, CommonTree joinCond, Vector<String> leftSrc)
+ /**
+ * Parse the join condition.
+ * If the condition is a join condition, throw an error if it is not an equality. Otherwise, break it into left and
+ * right expressions and store in the join tree.
+ * If the condition is a join filter, add it to the filter list of join tree. The join condition can contains conditions
+ * on both the left and tree trees and filters on either. Currently, we only support equi-joins, so we throw an error
+ * if the condition involves both subtrees and is not a equality. Also, we only support AND i.e ORs are not supported
+ * currently as their semantics are not very clear, may lead to data explosion and there is no usecase.
+ * @param joinTree jointree to be populated
+ * @param joinCond join condition
+ * @param leftSrc left sources
+ * @throws SemanticException
+ */
+ private void parseJoinCondition(QBJoinTree joinTree, CommonTree joinCond, Vector<String> leftSrc)
throws SemanticException {
switch (joinCond.getToken().getType()) {
+ case HiveParser.KW_OR:
+ throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_3.getMsg(joinCond));
+
case HiveParser.KW_AND:
- parseJoinCondition(joinParseTree, joinTree, (CommonTree) joinCond
+ parseJoinCondition(joinTree, (CommonTree) joinCond
.getChild(0), leftSrc);
- parseJoinCondition(joinParseTree, joinTree, (CommonTree) joinCond
+ parseJoinCondition(joinTree, (CommonTree) joinCond
.getChild(1), leftSrc);
break;
case HiveParser.EQUAL:
CommonTree leftCondn = (CommonTree) joinCond.getChild(0);
- Vector<String> leftAliases = new Vector<String>();
- Vector<String> rightAliases = new Vector<String>();
- parseJoinCondPopulateAlias(joinTree, leftCondn, leftAliases, rightAliases);
- populateAliases(leftAliases, rightAliases, leftCondn, joinTree, leftSrc);
+ Vector<String> leftCondAl1 = new Vector<String>();
+ Vector<String> leftCondAl2 = new Vector<String>();
+ parseJoinCondPopulateAlias(joinTree, leftCondn, leftCondAl1, leftCondAl2);
CommonTree rightCondn = (CommonTree) joinCond.getChild(1);
- leftAliases.clear();
- rightAliases.clear();
- parseJoinCondPopulateAlias(joinTree, rightCondn, leftAliases,
- rightAliases);
- populateAliases(leftAliases, rightAliases, rightCondn, joinTree, leftSrc);
+ Vector<String> rightCondAl1 = new Vector<String>();
+ Vector<String> rightCondAl2 = new Vector<String>();
+ parseJoinCondPopulateAlias(joinTree, rightCondn, rightCondAl1, rightCondAl2);
+
+ // is it a filter or a join condition
+ if (((leftCondAl1.size() != 0) && (leftCondAl2.size() != 0)) ||
+ ((rightCondAl1.size() != 0) && (rightCondAl2.size() != 0)))
+ throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_1.getMsg(joinCond));
+
+ if (leftCondAl1.size() != 0) {
+ if ((rightCondAl1.size() != 0) || ((rightCondAl1.size() == 0) && (rightCondAl2.size() == 0)))
+ joinTree.getFilters().get(0).add(joinCond);
+ else if (rightCondAl2.size() != 0) {
+ populateAliases(leftCondAl1, leftCondAl2, leftCondn, joinTree, leftSrc);
+ populateAliases(rightCondAl1, rightCondAl2, rightCondn, joinTree, leftSrc);
+ }
+ }
+ else if (leftCondAl2.size() != 0) {
+ if ((rightCondAl2.size() != 0) || ((rightCondAl1.size() == 0) && (rightCondAl2.size() == 0)))
+ joinTree.getFilters().get(1).add(joinCond);
+ else if (rightCondAl1.size() != 0) {
+ populateAliases(leftCondAl1, leftCondAl2, leftCondn, joinTree, leftSrc);
+ populateAliases(rightCondAl1, rightCondAl2, rightCondn, joinTree, leftSrc);
+ }
+ }
+ else if (rightCondAl1.size() != 0)
+ joinTree.getFilters().get(0).add(joinCond);
+ else
+ joinTree.getFilters().get(1).add(joinCond);
+
break;
default:
+ boolean isFunction = (joinCond.getType() == HiveParser.TOK_FUNCTION);
+
+ // Create all children
+ int childrenBegin = (isFunction ? 1 : 0);
+ ArrayList<Vector<String>> leftAlias = new ArrayList<Vector<String>>(joinCond.getChildCount() - childrenBegin);
+ ArrayList<Vector<String>> rightAlias = new ArrayList<Vector<String>>(joinCond.getChildCount() - childrenBegin);
+ for (int ci = 0; ci < joinCond.getChildCount() - childrenBegin; ci++) {
+ Vector<String> left = new Vector<String>();
+ Vector<String> right = new Vector<String>();
+ leftAlias.add(left);
+ rightAlias.add(right);
+ }
+
+ for (int ci=childrenBegin; ci<joinCond.getChildCount(); ci++)
+ parseJoinCondPopulateAlias(joinTree, (CommonTree)joinCond.getChild(ci), leftAlias.get(ci-childrenBegin), rightAlias.get(ci-childrenBegin));
+
+ boolean leftAliasNull = true;
+ for (Vector<String> left : leftAlias) {
+ if (left.size() != 0) {
+ leftAliasNull = false;
+ break;
+ }
+ }
+
+ boolean rightAliasNull = true;
+ for (Vector<String> right : rightAlias) {
+ if (right.size() != 0) {
+ rightAliasNull = false;
+ break;
+ }
+ }
+
+ if (!leftAliasNull && !rightAliasNull)
+ throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_1.getMsg(joinCond));
+
+ if (!leftAliasNull)
+ joinTree.getFilters().get(0).add(joinCond);
+ else
+ joinTree.getFilters().get(1).add(joinCond);
+
break;
}
}
+ @SuppressWarnings("nls")
+ private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr) {
+ OpParseContext ctx = new OpParseContext(rr);
+ opParseCtx.put(op, ctx);
+ return op;
+ }
@SuppressWarnings("nls")
- private OperatorInfo genFilterPlan(String dest, QB qb,
- OperatorInfo input) throws SemanticException {
+ private Operator genFilterPlan(String dest, QB qb,
+ Operator input) throws SemanticException {
CommonTree whereExpr = qb.getParseInfo().getWhrForClause(dest);
- OperatorInfo output = (OperatorInfo)input.clone();
- output.setOp(
- OperatorFactory.getAndMakeChild(
- new filterDesc(genExprNodeDesc((CommonTree)whereExpr.getChild(0),
- qb.getParseInfo().getAlias(),
- input.getRowResolver())),
- new RowSchema(output.getRowResolver().getColumnInfos()),
- input.getOp()
- )
- );
- LOG.debug("Created Filter Plan for " + qb.getId() + ":" + dest + " row schema: " + output.getRowResolver().toString());
+ OpParseContext inputCtx = opParseCtx.get(input);
+ RowResolver inputRR = inputCtx.getRR();
+ Operator output = putOpInsertMap(
+ OperatorFactory.getAndMakeChild(
+ new filterDesc(genExprNodeDesc(qb.getMetaData(), (CommonTree)whereExpr.getChild(0), inputRR)),
+ new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+
+ LOG.debug("Created Filter Plan for " + qb.getId() + ":" + dest + " row schema: " + inputRR.toString());
+ return output;
+ }
+
+ /**
+ * create a filter plan. The condition and the inputs are specified.
+ * @param qb current query block
+ * @param condn The condition to be resolved
+ * @param input the input operator
+ */
+ @SuppressWarnings("nls")
+ private Operator genFilterPlan(QB qb, CommonTree condn, Operator input) throws SemanticException {
+
+ OpParseContext inputCtx = opParseCtx.get(input);
+ RowResolver inputRR = inputCtx.getRR();
+ Operator output = putOpInsertMap(
+ OperatorFactory.getAndMakeChild(
+ new filterDesc(genExprNodeDesc(qb.getMetaData(), condn, inputRR)),
+ new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+
+ LOG.debug("Created Filter Plan for " + qb.getId() + " row schema: " + inputRR.toString());
return output;
}
@SuppressWarnings("nls")
- private void genColList(String alias, CommonTree sel,
+ private void genColList(String tabAlias, String alias, CommonTree sel,
ArrayList<exprNodeDesc> col_list, RowResolver input, Integer pos,
RowResolver output) throws SemanticException {
+
+ // The table alias should exist
+ if (tabAlias != null && !input.hasTableAlias(tabAlias))
+ throw new SemanticException(ErrorMsg.INVALID_TABLE_ALIAS.getMsg(sel));
+
// TODO: Have to put in the support for AS clause
// This is the tab.* case
@@ -703,42 +890,77 @@
}
}
- @SuppressWarnings("nls")
- private OperatorInfo genScriptPlan(CommonTree trfm, QB qb,
- OperatorInfo input) throws SemanticException {
+ /**
+ * If the user script command needs any modifications - do it here
+ */
+ private String getFixedCmd(String cmd) {
+ SessionState ss = SessionState.get();
+ if(ss == null)
+ return cmd;
+
+ // for local mode - replace any references to packaged files by name with
+ // the reference to the original file path
+ if(ss.getConf().get("mapred.job.tracker", "local").equals("local")) {
+ Set<String> files = ss.list_resource(SessionState.ResourceType.FILE, null);
+ if((files != null) && !files.isEmpty()) {
+ int end = cmd.indexOf(" ");
+ String prog = (end == -1) ? cmd : cmd.substring(0, end);
+ String args = (end == -1) ? "" : cmd.substring(end, cmd.length());
+
+ for(String oneFile: files) {
+ Path p = new Path(oneFile);
+ if(p.getName().equals(prog)) {
+ cmd = oneFile + args;
+ break;
+ }
+ }
+ }
+ }
- OperatorInfo output = (OperatorInfo)input.clone();
+ return cmd;
+ }
- // Change the rws in this case
- CommonTree collist = (CommonTree) trfm.getChild(1);
- int ccount = collist.getChildCount();
- RowResolver out_rwsch = new RowResolver();
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < ccount; ++i) {
+ @SuppressWarnings("nls")
+ private Operator genScriptPlan(CommonTree trfm, QB qb,
+ Operator input) throws SemanticException {
+ // If there is no "AS" clause, the output schema will be "key,value"
+ ArrayList<String> outputColList = new ArrayList<String>();
+ boolean defaultOutputColList = (trfm.getChildCount() < 3);
+ if (defaultOutputColList) {
+ outputColList.add("key");
+ outputColList.add("value");
+ } else {
+ CommonTree collist = (CommonTree) trfm.getChild(2);
+ int ccount = collist.getChildCount();
+ for (int i=0; i < ccount; ++i) {
+ outputColList.add(unescapeIdentifier(((CommonTree)collist.getChild(i)).getText()));
+ }
+ }
+
+ RowResolver out_rwsch = new RowResolver();
+ StringBuilder columns = new StringBuilder();
+ for (int i = 0; i < outputColList.size(); ++i) {
if (i != 0) {
- sb.append(",");
+ columns.append(",");
}
- sb.append(((CommonTree)collist.getChild(i)).getText());
+ columns.append(outputColList.get(i));
out_rwsch.put(
qb.getParseInfo().getAlias(),
- ((CommonTree)collist.getChild(i)).getText(),
- new ColumnInfo(((CommonTree)collist.getChild(i)).getText(),
- String.class) // Everything is a string right now
+ outputColList.get(i),
+ new ColumnInfo(outputColList.get(i), String.class) // Script output is always a string
);
}
- output
- .setOp(OperatorFactory
+ Operator output = putOpInsertMap(OperatorFactory
.getAndMakeChild(
new scriptDesc(
- stripQuotes(trfm.getChild(2).getText()),
- PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), sb.toString()),
+ getFixedCmd(stripQuotes(trfm.getChild(1).getText())),
+ PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), columns.toString(), defaultOutputColList),
PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), "")),
new RowSchema(
- out_rwsch.getColumnInfos()), input.getOp()));
+ out_rwsch.getColumnInfos()), input), out_rwsch);
- output.setRowResolver(out_rwsch);
return output;
}
@@ -776,17 +998,22 @@
private static String getColAlias(CommonTree selExpr, String defaultName) {
if (selExpr.getChildCount() == 2) {
// return zz for "xx + yy AS zz"
- return selExpr.getChild(1).getText();
+ return unescapeIdentifier(selExpr.getChild(1).getText());
}
CommonTree root = (CommonTree)selExpr.getChild(0);
while (root.getType() == HiveParser.DOT || root.getType() == HiveParser.TOK_COLREF) {
- assert(root.getChildCount() == 2);
- root = (CommonTree) root.getChild(1);
+ if (root.getType() == HiveParser.TOK_COLREF && root.getChildCount() == 1) {
+ root = (CommonTree) root.getChild(0);
+ }
+ else {
+ assert(root.getChildCount() == 2);
+ root = (CommonTree) root.getChild(1);
+ }
}
if (root.getType() == HiveParser.Identifier) {
// Return zz for "xx.zz" and "xx.yy.zz"
- return root.getText();
+ return unescapeIdentifier(root.getText());
} else {
// Return defaultName if selExpr is not a simple xx.yy.zz
return defaultName;
@@ -794,8 +1021,8 @@
}
@SuppressWarnings("nls")
- private OperatorInfo genSelectPlan(String dest, QB qb,
- OperatorInfo input) throws SemanticException {
+ private Operator genSelectPlan(String dest, QB qb,
+ Operator input) throws SemanticException {
CommonTree selExprList = qb.getParseInfo().getSelForClause(dest);
@@ -804,7 +1031,9 @@
CommonTree trfm = null;
String alias = qb.getParseInfo().getAlias();
Integer pos = Integer.valueOf(0);
-
+ RowResolver inputRR = opParseCtx.get(input).getRR();
+ boolean selectStar = false;
+
// Iterate over the selects
for (int i = 0; i < selExprList.getChildCount(); ++i) {
@@ -812,10 +1041,13 @@
CommonTree selExpr = (CommonTree) selExprList.getChild(i);
String colAlias = getColAlias(selExpr, "_C" + i);
CommonTree sel = (CommonTree)selExpr.getChild(0);
-
+
if (sel.getToken().getType() == HiveParser.TOK_ALLCOLREF) {
- genColList(qb.getParseInfo().getAlias(), sel, col_list,
- input.getRowResolver(), pos, out_rwsch);
+ String tabAlias = null;
+ if (sel.getChildCount() == 1)
+ tabAlias = unescapeIdentifier(sel.getChild(0).getText().toLowerCase());
+ genColList(tabAlias, alias, sel, col_list, inputRR, pos, out_rwsch);
+ selectStar = true;
} else if (sel.getToken().getType() == HiveParser.TOK_TRANSFORM) {
if (i > 0) {
throw new SemanticException(ErrorMsg.INVALID_TRANSFORM.getMsg(sel));
@@ -825,26 +1057,28 @@
for (int j = 0; j < cols.getChildCount(); ++j) {
CommonTree expr = (CommonTree) cols.getChild(j);
if (expr.getToken().getType() == HiveParser.TOK_ALLCOLREF) {
- genColList(alias, expr,
- col_list, input.getRowResolver(),
- pos, out_rwsch);
+ String tabAlias = null;
+ if (sel.getChildCount() == 1)
+ tabAlias = unescapeIdentifier(sel.getChild(0).getText().toLowerCase());
+
+ genColList(tabAlias, alias, expr, col_list, inputRR, pos, out_rwsch);
+ selectStar = true;
} else {
- exprNodeDesc exp = genExprNodeDesc(expr, alias, input.getRowResolver());
+ exprNodeDesc exp = genExprNodeDesc(qb.getMetaData(), expr, inputRR);
col_list.add(exp);
if (!StringUtils.isEmpty(alias) &&
(out_rwsch.get(alias, colAlias) != null)) {
throw new SemanticException(ErrorMsg.AMBIGOUS_COLUMN.getMsg(expr.getChild(1)));
}
- out_rwsch.put(alias, expr.getText(),
+ out_rwsch.put(alias, unescapeIdentifier(expr.getText()),
new ColumnInfo((Integer.valueOf(pos)).toString(),
- exp.getTypeInfo())); // Everything is a string right now
+ exp.getTypeInfo()));
}
}
} else {
// Case when this is an expression
- exprNodeDesc exp = genExprNodeDesc(sel, qb.getParseInfo()
- .getAlias(), input.getRowResolver());
+ exprNodeDesc exp = genExprNodeDesc(qb.getMetaData(), sel, inputRR);
col_list.add(exp);
if (!StringUtils.isEmpty(alias) &&
(out_rwsch.get(alias, colAlias) != null)) {
@@ -854,7 +1088,7 @@
// of the expression as the column name
out_rwsch.put(alias, colAlias,
new ColumnInfo((Integer.valueOf(pos)).toString(),
- exp.getTypeInfo())); // Everything is a string right now
+ exp.getTypeInfo()));
}
pos = Integer.valueOf(pos.intValue() + 1);
}
@@ -865,29 +1099,64 @@
}
}
- OperatorInfo output = (OperatorInfo) input.clone();
- output.setOp(OperatorFactory.getAndMakeChild(
- new selectDesc(col_list), new RowSchema(out_rwsch.getColumnInfos()),
- input.getOp()));
-
- output.setRowResolver(out_rwsch);
+ Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new selectDesc(col_list, (selExprList.getChildCount() == 1) && selectStar), new RowSchema(out_rwsch.getColumnInfos()),
+ input), out_rwsch);
if (trfm != null) {
output = genScriptPlan(trfm, qb, output);
}
- LOG.debug("Created Select Plan for clause: " + dest + " row schema: "
- + output.getRowResolver().toString());
+ LOG.debug("Created Select Plan for clause: " + dest + " row schema: " + out_rwsch.toString());
return output;
}
+ /**
+ * Class to store UDAF related information.
+ */
+ static class UDAFInfo {
+ ArrayList<exprNodeDesc> convertedParameters;
+ Method aggregateMethod;
+ Method evaluateMethod;
+ }
+
+ /**
+ * Returns the UDAFInfo struct for the aggregation
+ * @param aggName The name of the UDAF.
+ * @param mode The mode of the aggregation. This affects the evaluate method.
+ * @param aggClasses The classes of the parameters to the UDAF.
+ * @param aggParameters The actual exprNodeDesc of the parameters.
+ * @param aggTree The CommonTree node of the UDAF in the query.
+ * @return UDAFInfo
+ * @throws SemanticException when the UDAF is not found or has problems.
+ */
+ UDAFInfo getUDAFInfo(String aggName, groupByDesc.Mode mode, ArrayList<Class<?>> aggClasses,
+ ArrayList<exprNodeDesc> aggParameters, CommonTree aggTree) throws SemanticException {
+ UDAFInfo r = new UDAFInfo();
+ r.aggregateMethod = FunctionRegistry.getUDAFMethod(aggName, aggClasses);
+ if (null == r.aggregateMethod) {
+ String reason = "Looking for UDAF \"" + aggName + "\" with parameters " + aggClasses;
+ throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((CommonTree)aggTree.getChild(0), reason));
+ }
+
+ r.convertedParameters = convertParameters(r.aggregateMethod, aggParameters);
+
+ r.evaluateMethod = FunctionRegistry.getUDAFEvaluateMethod(aggName, mode);
+ if (r.evaluateMethod == null) {
+ String reason = "UDAF \"" + aggName + "\" does not have evaluate()/evaluatePartial() methods.";
+ throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg((CommonTree)aggTree.getChild(0), reason));
+ }
+
+ return r;
+ }
+
@SuppressWarnings("nls")
- private OperatorInfo genGroupByPlanGroupByOperator(
- QBParseInfo parseInfo, String dest, OperatorInfo reduceSinkOperatorInfo,
+ private Operator genGroupByPlanGroupByOperator(
+ QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo,
groupByDesc.Mode mode)
throws SemanticException {
- RowResolver groupByInputRowResolver = reduceSinkOperatorInfo.getRowResolver();
+ RowResolver groupByInputRowResolver = opParseCtx.get(reduceSinkOperatorInfo).getRR();
RowResolver groupByOutputRowResolver = new RowResolver();
groupByOutputRowResolver.setIsExprResolver(true);
ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
@@ -933,41 +1202,38 @@
aggClasses.add(paraExprInfo.getType().getPrimitiveClass());
}
- if (null == FunctionRegistry.getUDAFMethod(aggName, aggClasses)) {
- String reason = "Looking for UDAF \"" + aggName + "\" with parameters " + aggClasses;
- throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((CommonTree)value.getChild(0), reason));
- }
+ UDAFInfo udaf = getUDAFInfo(aggName, mode, aggClasses, aggParameters, value);
- aggregations.add(new aggregationDesc(aggClass, aggParameters,
+ aggregations.add(new aggregationDesc(aggClass, udaf.convertedParameters,
value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
groupByOutputRowResolver.put("",value.toStringTree(),
new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() -1).toString(),
- String.class)); // Everything is a string right now
+ udaf.evaluateMethod.getReturnType()));
}
- return new OperatorInfo(
- OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
- new RowSchema(groupByOutputRowResolver.getColumnInfos()),
- reduceSinkOperatorInfo.getOp()),
+ return
+ putOpInsertMap(OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
+ new RowSchema(groupByOutputRowResolver.getColumnInfos()),
+ reduceSinkOperatorInfo),
groupByOutputRowResolver
);
}
@SuppressWarnings("nls")
- private OperatorInfo genGroupByPlanGroupByOpForward(
- QBParseInfo parseInfo, String dest, OperatorInfo forwardOpInfo,
+ private Operator genGroupByPlanGroupByOperator1(
+ QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo,
groupByDesc.Mode mode)
throws SemanticException {
- RowResolver inputRS = forwardOpInfo.getRowResolver();
- RowResolver outputRS = new RowResolver();
- outputRS.setIsExprResolver(true);
+ RowResolver groupByInputRowResolver = opParseCtx.get(reduceSinkOperatorInfo).getRR();
+ RowResolver groupByOutputRowResolver = new RowResolver();
+ groupByOutputRowResolver.setIsExprResolver(true);
ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
- for (int i = 0; i < grpByExprs.size(); i++) {
+ for (int i = 0; i < grpByExprs.size(); ++i) {
CommonTree grpbyExpr = grpByExprs.get(i);
String text = grpbyExpr.toStringTree();
- ColumnInfo exprInfo = inputRS.get("",text);
+ ColumnInfo exprInfo = groupByInputRowResolver.get("",text);
if (exprInfo == null) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(grpbyExpr));
@@ -975,16 +1241,13 @@
groupByKeys.add(new exprNodeColumnDesc(exprInfo.getType(), exprInfo.getInternalName()));
String field = (Integer.valueOf(i)).toString();
- outputRS.put("", text,
- new ColumnInfo(field, exprInfo.getType()));
+ groupByOutputRowResolver.put("",grpbyExpr.toStringTree(),
+ new ColumnInfo(field, exprInfo.getType()));
}
- // For each aggregation
- HashMap<String, CommonTree> aggregationTrees = parseInfo
- .getAggregationExprsForClause(dest);
- assert (aggregationTrees != null);
- for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
- CommonTree value = entry.getValue();
+ // If there is a distinctFuncExp, add all parameters to the reduceKeys.
+ if (parseInfo.getDistinctFuncExprForClause(dest) != null) {
+ CommonTree value = parseInfo.getDistinctFuncExprForClause(dest);
String aggName = value.getChild(0).getText();
Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
assert (aggClass != null);
@@ -994,7 +1257,7 @@
for (int i = 1; i < value.getChildCount(); i++) {
String text = value.getChild(i).toStringTree();
CommonTree paraExpr = (CommonTree)value.getChild(i);
- ColumnInfo paraExprInfo = inputRS.get("", text);
+ ColumnInfo paraExprInfo = groupByInputRowResolver.get("",text);
if (paraExprInfo == null) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(paraExpr));
}
@@ -1005,31 +1268,230 @@
aggClasses.add(paraExprInfo.getType().getPrimitiveClass());
}
- if (null == FunctionRegistry.getUDAFMethod(aggName, aggClasses)) {
- String reason = "Looking for UDAF \"" + aggName + "\" with parameters " + aggClasses;
- throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((CommonTree)value.getChild(0), reason));
- }
+ UDAFInfo udaf = getUDAFInfo(aggName, mode, aggClasses, aggParameters, value);
- aggregations.add(new aggregationDesc(aggClass, aggParameters,
- value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
- outputRS.put("",value.toStringTree(),
+ aggregations.add(new aggregationDesc(aggClass, udaf.convertedParameters, true));
+ groupByOutputRowResolver.put("",value.toStringTree(),
new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() -1).toString(),
- String.class)); // Everything is a string right now
+ udaf.evaluateMethod.getReturnType()));
+ }
+
+ HashMap<String, CommonTree> aggregationTrees = parseInfo
+ .getAggregationExprsForClause(dest);
+ for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
+ CommonTree value = entry.getValue();
+ if (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI)
+ continue;
+
+ String aggName = value.getChild(0).getText();
+ Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
+ assert (aggClass != null);
+ ArrayList<exprNodeDesc> aggParameters = new ArrayList<exprNodeDesc>();
+ String text = entry.getKey();
+ ColumnInfo paraExprInfo = groupByInputRowResolver.get("",text);
+ if (paraExprInfo == null) {
+ throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(value));
+ }
+ String paraExpression = paraExprInfo.getInternalName();
+ assert(paraExpression != null);
+ aggParameters.add(new exprNodeColumnDesc(paraExprInfo.getType(), paraExpression));
+ aggregations.add(new aggregationDesc(aggClass, aggParameters, ((mode == groupByDesc.Mode.FINAL) ? false : (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI))));
+ groupByOutputRowResolver.put("", value.toStringTree(),
+ new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() - 1).toString(),
+ paraExprInfo.getType()));
}
- return new OperatorInfo(
+ return putOpInsertMap(
OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
- new RowSchema(outputRS.getColumnInfos()),
- forwardOpInfo.getOp()),
- outputRS
- );
+ new RowSchema(groupByOutputRowResolver.getColumnInfos()),
+ reduceSinkOperatorInfo),
+ groupByOutputRowResolver);
}
@SuppressWarnings("nls")
- private OperatorInfo genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
- String dest, OperatorInfo inputOperatorInfo, int numPartitionFields)
+ private Operator genGroupByPlanMapGroupByOperator(QB qb, String dest, Operator inputOperatorInfo,
+ groupByDesc.Mode mode) throws SemanticException {
+
+ RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
+ QBParseInfo parseInfo = qb.getParseInfo();
+ RowResolver groupByOutputRowResolver = new RowResolver();
+ groupByOutputRowResolver.setIsExprResolver(true);
+ ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
+ ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
+ List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
+ for (int i = 0; i < grpByExprs.size(); ++i) {
+ CommonTree grpbyExpr = grpByExprs.get(i);
+ exprNodeDesc grpByExprNode = genExprNodeDesc(qb.getMetaData(), grpbyExpr, groupByInputRowResolver);
+
+ groupByKeys.add(grpByExprNode);
+ String field = (Integer.valueOf(i)).toString();
+ groupByOutputRowResolver.put("",grpbyExpr.toStringTree(),
+ new ColumnInfo(field, grpByExprNode.getTypeInfo()));
+ }
+
+ // If there is a distinctFuncExp, add all parameters to the reduceKeys.
+ if (parseInfo.getDistinctFuncExprForClause(dest) != null) {
+ CommonTree value = parseInfo.getDistinctFuncExprForClause(dest);
+ int numDistn=0;
+ // 0 is function name
+ for (int i = 1; i < value.getChildCount(); i++) {
+ CommonTree parameter = (CommonTree) value.getChild(i);
+ String text = parameter.toStringTree();
+ if (groupByOutputRowResolver.get("",text) == null) {
+ exprNodeDesc distExprNode = genExprNodeDesc(qb.getMetaData(), parameter, groupByInputRowResolver);
+ groupByKeys.add(distExprNode);
+ numDistn++;
+ String field = (Integer.valueOf(grpByExprs.size() + numDistn -1)).toString();
+ groupByOutputRowResolver.put("", text, new ColumnInfo(field, distExprNode.getTypeInfo()));
+ }
+ }
+ }
+
+ // For each aggregation
+ HashMap<String, CommonTree> aggregationTrees = parseInfo
+ .getAggregationExprsForClause(dest);
+ assert (aggregationTrees != null);
+
+ for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
+ CommonTree value = entry.getValue();
+ String aggName = value.getChild(0).getText();
+ Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
+ assert (aggClass != null);
+ ArrayList<exprNodeDesc> aggParameters = new ArrayList<exprNodeDesc>();
+ ArrayList<Class<?>> aggClasses = new ArrayList<Class<?>>();
+ // 0 is the function name
+ for (int i = 1; i < value.getChildCount(); i++) {
+ CommonTree paraExpr = (CommonTree)value.getChild(i);
+ exprNodeDesc paraExprNode = genExprNodeDesc(qb.getMetaData(), paraExpr, groupByInputRowResolver);
+
+ aggParameters.add(paraExprNode);
+ aggClasses.add(paraExprNode.getTypeInfo().getPrimitiveClass());
+ }
+
+ UDAFInfo udaf = getUDAFInfo(aggName, mode, aggClasses, aggParameters, value);
+
+ aggregations.add(new aggregationDesc(aggClass, udaf.convertedParameters,
+ value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
+ groupByOutputRowResolver.put("",value.toStringTree(),
+ new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() -1).toString(),
+ udaf.evaluateMethod.getReturnType()));
+ }
+
+ return putOpInsertMap(
+ OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
+ new RowSchema(groupByOutputRowResolver.getColumnInfos()),
+ inputOperatorInfo),
+ groupByOutputRowResolver);
+ }
+
+ private ArrayList<exprNodeDesc> convertParameters(Method m, ArrayList<exprNodeDesc> aggParameters) {
+
+ ArrayList<exprNodeDesc> newParameters = new ArrayList<exprNodeDesc>();
+ Class<?>[] pTypes = m.getParameterTypes();
+
+ // 0 is the function name
+ for (int i = 0; i < aggParameters.size(); i++) {
+ exprNodeDesc desc = aggParameters.get(i);
+ Class<?> pType = ObjectInspectorUtils.generalizePrimitive(pTypes[i]);
+ if (desc instanceof exprNodeNullDesc) {
+ exprNodeConstantDesc newCh = new exprNodeConstantDesc(TypeInfoFactory.getPrimitiveTypeInfo(pType), null);
+ newParameters.add(newCh);
+ } else if (pType.isAssignableFrom(desc.getTypeInfo().getPrimitiveClass())) {
+ // no type conversion needed
+ newParameters.add(desc);
+ } else {
+ // must be implicit type conversion
+ Class<?> from = desc.getTypeInfo().getPrimitiveClass();
+ Class<?> to = pType;
+ assert(FunctionRegistry.implicitConvertable(from, to));
+ Method conv = FunctionRegistry.getUDFMethod(to.getName(), true, from);
+ assert(conv != null);
+ Class<? extends UDF> c = FunctionRegistry.getUDFClass(to.getName());
+ assert(c != null);
+
+ // get the conversion method
+ ArrayList<exprNodeDesc> conversionArg = new ArrayList<exprNodeDesc>(1);
+ conversionArg.add(desc);
+ newParameters.add(new exprNodeFuncDesc(TypeInfoFactory.getPrimitiveTypeInfo(pType),
+ c, conv, conversionArg));
+ }
+ }
+
+ return newParameters;
+ }
+
+ @SuppressWarnings("nls")
+ private Operator genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
+ String dest, Operator inputOperatorInfo)
throws SemanticException {
- RowResolver reduceSinkInputRowResolver = inputOperatorInfo.getRowResolver();
+ RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
+ RowResolver reduceSinkOutputRowResolver = new RowResolver();
+ reduceSinkOutputRowResolver.setIsExprResolver(true);
+ ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
+
+ // Pre-compute group-by keys and store in reduceKeys
+ List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
+ for (int i = 0; i < grpByExprs.size(); ++i) {
+ CommonTree grpbyExpr = grpByExprs.get(i);
+ String text = grpbyExpr.toStringTree();
+
+ if (reduceSinkOutputRowResolver.get("", text) == null) {
+ ColumnInfo exprInfo = reduceSinkInputRowResolver.get("", text);
+ reduceKeys.add(new exprNodeColumnDesc(exprInfo.getType(), exprInfo.getInternalName()));
+ reduceSinkOutputRowResolver.put("", text,
+ new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
+ exprInfo.getType()));
+ }
+ }
+
+ // If there is a distinctFuncExp, add all parameters to the reduceKeys.
+ if (parseInfo.getDistinctFuncExprForClause(dest) != null) {
+ CommonTree value = parseInfo.getDistinctFuncExprForClause(dest);
+ // 0 is function name
+ for (int i = 1; i < value.getChildCount(); i++) {
+ CommonTree parameter = (CommonTree) value.getChild(i);
+ String text = parameter.toStringTree();
+ if (reduceSinkOutputRowResolver.get("",text) == null) {
+ ColumnInfo exprInfo = reduceSinkInputRowResolver.get("", text);
+ reduceKeys.add(new exprNodeColumnDesc(exprInfo.getType(), exprInfo.getInternalName()));
+ reduceSinkOutputRowResolver.put("", text,
+ new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
+ exprInfo.getType()));
+ }
+ }
+ }
+
+ // Put partial aggregation results in reduceValues
+ ArrayList<exprNodeDesc> reduceValues = new ArrayList<exprNodeDesc>();
+ HashMap<String, CommonTree> aggregationTrees = parseInfo
+ .getAggregationExprsForClause(dest);
+ int inputField = reduceKeys.size();
+
+ for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
+
+ TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get(inputField).getType();
+ reduceValues.add(new exprNodeColumnDesc(
+ type, (Integer.valueOf(inputField)).toString()));
+ inputField++;
+ reduceSinkOutputRowResolver.put("", ((CommonTree)entry.getValue()).toStringTree(),
+ new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + (Integer.valueOf(reduceValues.size()-1)).toString(),
+ type));
+ }
+
+ return putOpInsertMap(
+ OperatorFactory.getAndMakeChild(
+ PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1,
+ (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1 : Integer.MAX_VALUE), -1, false),
+ new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()),
+ inputOperatorInfo),
+ reduceSinkOutputRowResolver);
+ }
+
+ @SuppressWarnings("nls")
+ private Operator genGroupByPlanReduceSinkOperator(QB qb,
+ String dest, Operator inputOperatorInfo, int numPartitionFields) throws SemanticException {
+ RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
+ QBParseInfo parseInfo = qb.getParseInfo();
RowResolver reduceSinkOutputRowResolver = new RowResolver();
reduceSinkOutputRowResolver.setIsExprResolver(true);
ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
@@ -1038,13 +1500,12 @@
List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
for (int i = 0; i < grpByExprs.size(); ++i) {
CommonTree grpbyExpr = grpByExprs.get(i);
- reduceKeys.add(genExprNodeDesc(grpbyExpr, parseInfo.getAlias(),
- reduceSinkInputRowResolver));
+ reduceKeys.add(genExprNodeDesc(qb.getMetaData(), grpbyExpr, reduceSinkInputRowResolver));
String text = grpbyExpr.toStringTree();
if (reduceSinkOutputRowResolver.get("", text) == null) {
reduceSinkOutputRowResolver.put("", text,
new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
- String.class)); // Everything is a string right now
+ reduceKeys.get(reduceKeys.size()-1).getTypeInfo()));
} else {
throw new SemanticException(ErrorMsg.DUPLICATE_GROUPBY_KEY.getMsg(grpbyExpr));
}
@@ -1058,10 +1519,10 @@
CommonTree parameter = (CommonTree) value.getChild(i);
String text = parameter.toStringTree();
if (reduceSinkOutputRowResolver.get("",text) == null) {
- reduceKeys.add(genExprNodeDesc(parameter, parseInfo.getAlias(), reduceSinkInputRowResolver));
+ reduceKeys.add(genExprNodeDesc(qb.getMetaData(), parameter, reduceSinkInputRowResolver));
reduceSinkOutputRowResolver.put("", text,
new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
- String.class)); // Everything is a string right now
+ reduceKeys.get(reduceKeys.size()-1).getTypeInfo()));
}
}
}
@@ -1077,130 +1538,28 @@
CommonTree parameter = (CommonTree) value.getChild(i);
String text = parameter.toStringTree();
if (reduceSinkOutputRowResolver.get("",text) == null) {
- reduceValues.add(genExprNodeDesc(parameter, parseInfo.getAlias(), reduceSinkInputRowResolver));
+ reduceValues.add(genExprNodeDesc(qb.getMetaData(), parameter, reduceSinkInputRowResolver));
reduceSinkOutputRowResolver.put("", text,
new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
- String.class)); // Everything is a string right now
+ reduceValues.get(reduceValues.size()-1).getTypeInfo()));
}
}
}
- return new OperatorInfo(
+ return putOpInsertMap(
OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1, numPartitionFields,
-1, false),
new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()),
- inputOperatorInfo.getOp()),
+ inputOperatorInfo),
reduceSinkOutputRowResolver
);
}
@SuppressWarnings("nls")
- private OperatorInfo genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
- OperatorInfo input, CommonTree distinctText, TreeSet<String> ks)
- throws SemanticException {
- RowResolver inputRS = input.getRowResolver();
- RowResolver outputRS = new RowResolver();
- outputRS.setIsExprResolver(true);
- ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
-
- // Spray on distinctText first
- if (distinctText != null)
- {
- reduceKeys.add(genExprNodeDesc(distinctText, parseInfo.getAlias(), inputRS));
- String text = distinctText.toStringTree();
- assert (outputRS.get("", text) == null);
- outputRS.put("", text,
- new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
- String.class));
- }
- else {
- // dummy key
- reduceKeys.add(new exprNodeConstantDesc(0));
- }
-
- // copy the input row resolver
- ArrayList<exprNodeDesc> reduceValues = new ArrayList<exprNodeDesc>();
- Iterator<String> keysIter = inputRS.getTableNames().iterator();
- while (keysIter.hasNext())
- {
- String key = keysIter.next();
- HashMap<String, ColumnInfo> map = inputRS.getFieldMap(key);
- Iterator<String> fNamesIter = map.keySet().iterator();
- while (fNamesIter.hasNext())
- {
- String field = fNamesIter.next();
- ColumnInfo valueInfo = inputRS.get(key, field);
-
- if (outputRS.get(key, field) == null)
- {
- reduceValues.add(new exprNodeColumnDesc(valueInfo.getType(), valueInfo.getInternalName()));
- outputRS.put(key, field, new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
- valueInfo.getType()));
- }
- }
- }
-
- for (String dest : ks) {
- List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
-
- // send all the group by expressions
- for (int i = 0; i < grpByExprs.size(); ++i) {
- CommonTree grpbyExpr = grpByExprs.get(i);
- String text = grpbyExpr.toStringTree();
- if (outputRS.get("", text) == null) {
- exprNodeDesc grpbyExprNode = genExprNodeDesc(grpbyExpr, parseInfo.getAlias(), inputRS);
- reduceValues.add(grpbyExprNode);
- outputRS.put("", text,
- new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
- grpbyExprNode.getTypeInfo()));
- }
- }
-
- // send all the aggregation expressions
- HashMap<String, CommonTree> aggregationTrees = parseInfo.getAggregationExprsForClause(dest);
- for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
- CommonTree value = entry.getValue();
- // 0 is function name
- for (int i = 1; i < value.getChildCount(); i++) {
- CommonTree parameter = (CommonTree) value.getChild(i);
- String text = parameter.toStringTree();
- if (outputRS.get("",text) == null) {
- exprNodeDesc pNode = genExprNodeDesc(parameter, parseInfo.getAlias(), inputRS);
- reduceValues.add(pNode);
- outputRS.put("", text,
- new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
- pNode.getTypeInfo()));
- }
- }
- }
- }
-
- return new OperatorInfo(
- OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues,
- -1, distinctText == null ? -1 : 1, -1, false),
- new RowSchema(outputRS.getColumnInfos()), input.getOp()),
- outputRS);
- }
-
- @SuppressWarnings("nls")
- private OperatorInfo genGroupByPlanForwardOperator(QBParseInfo parseInfo, OperatorInfo input)
- throws SemanticException {
- RowResolver outputRS = input.getRowResolver();;
-
- Operator<? extends Serializable> forward = OperatorFactory.get(forwardDesc.class,
- new RowSchema(outputRS.getColumnInfos()));
- // set forward operator as child of each of input
- List<Operator<? extends Serializable>> child = new ArrayList<Operator<? extends Serializable>>();
- child.add(forward);
- input.getOp().setChildOperators(child);
-
- return new OperatorInfo(forward, outputRS);
- }
-
- @SuppressWarnings("nls")
- private OperatorInfo genGroupByPlanReduceSinkOperator2MR(
- QBParseInfo parseInfo, String dest, OperatorInfo groupByOperatorInfo,
- int numPartitionFields) {
+ private Operator genGroupByPlanReduceSinkOperator2MR(
+ QBParseInfo parseInfo, String dest, Operator groupByOperatorInfo, int numPartitionFields)
+ throws SemanticException {
+ RowResolver reduceSinkInputRowResolver2 = opParseCtx.get(groupByOperatorInfo).getRR();
RowResolver reduceSinkOutputRowResolver2 = new RowResolver();
reduceSinkOutputRowResolver2.setIsExprResolver(true);
ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
@@ -1209,10 +1568,11 @@
for (int i = 0; i < grpByExprs.size(); ++i) {
CommonTree grpbyExpr = grpByExprs.get(i);
String field = (Integer.valueOf(i)).toString();
- reduceKeys.add(new exprNodeColumnDesc(TypeInfoFactory.getPrimitiveTypeInfo(String.class), field));
+ TypeInfo typeInfo = reduceSinkInputRowResolver2.get("", grpbyExpr.toStringTree()).getType();
+ reduceKeys.add(new exprNodeColumnDesc(typeInfo, field));
reduceSinkOutputRowResolver2.put("", grpbyExpr.toStringTree(),
new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + field,
- String.class)); // Everything is a string right now
+ typeInfo));
}
// Get partial aggregation results and store in reduceValues
ArrayList<exprNodeDesc> reduceValues = new ArrayList<exprNodeDesc>();
@@ -1220,28 +1580,30 @@
HashMap<String, CommonTree> aggregationTrees = parseInfo
.getAggregationExprsForClause(dest);
for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
- reduceValues.add(new exprNodeColumnDesc(TypeInfoFactory.getPrimitiveTypeInfo(String.class),
- (Integer.valueOf(inputField)).toString()));
+ String field = (Integer.valueOf(inputField)).toString();
+ CommonTree t = entry.getValue();
+ TypeInfo typeInfo = reduceSinkInputRowResolver2.get("", t.toStringTree()).getType();
+ reduceValues.add(new exprNodeColumnDesc(typeInfo, field));
inputField++;
- reduceSinkOutputRowResolver2.put("", ((CommonTree)entry.getValue()).toStringTree(),
+ reduceSinkOutputRowResolver2.put("", t.toStringTree(),
new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + (Integer.valueOf(reduceValues.size()-1)).toString(),
- String.class)); // Everything is a string right now
+ typeInfo));
}
- return new OperatorInfo(
+ return putOpInsertMap(
OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1,
numPartitionFields, -1, true),
new RowSchema(reduceSinkOutputRowResolver2.getColumnInfos()),
- groupByOperatorInfo.getOp()),
+ groupByOperatorInfo),
reduceSinkOutputRowResolver2
);
}
@SuppressWarnings("nls")
- private OperatorInfo genGroupByPlanGroupByOperator2MR(
- QBParseInfo parseInfo, String dest, OperatorInfo reduceSinkOperatorInfo2)
+ private Operator genGroupByPlanGroupByOperator2MR(
+ QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo2, groupByDesc.Mode mode)
throws SemanticException {
- RowResolver groupByInputRowResolver2 = reduceSinkOperatorInfo2.getRowResolver();
+ RowResolver groupByInputRowResolver2 = opParseCtx.get(reduceSinkOperatorInfo2).getRR();
RowResolver groupByOutputRowResolver2 = new RowResolver();
groupByOutputRowResolver2.setIsExprResolver(true);
ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
@@ -1274,20 +1636,19 @@
if (paraExprInfo == null) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(value));
}
-
String paraExpression = paraExprInfo.getInternalName();
assert(paraExpression != null);
aggParameters.add(new exprNodeColumnDesc(paraExprInfo.getType(), paraExpression));
- aggregations.add(new aggregationDesc(aggClass, aggParameters, false));
+ aggregations.add(new aggregationDesc(aggClass, aggParameters, ((mode == groupByDesc.Mode.FINAL) ? false : (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI))));
groupByOutputRowResolver2.put("", value.toStringTree(),
new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() - 1).toString(),
- paraExprInfo.getType())); // Everything is a string right now
+ paraExprInfo.getType()));
}
- return new OperatorInfo(
- OperatorFactory.getAndMakeChild(new groupByDesc(groupByDesc.Mode.PARTIAL2, groupByKeys, aggregations),
- new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
- reduceSinkOperatorInfo2.getOp()),
+ return putOpInsertMap(
+ OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
+ new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
+ reduceSinkOperatorInfo2),
groupByOutputRowResolver2
);
}
@@ -1308,20 +1669,18 @@
* @throws SemanticException
*/
@SuppressWarnings({ "unused", "nls" })
- private OperatorInfo genGroupByPlan1MR(String dest, QB qb,
- OperatorInfo input) throws SemanticException {
+ private Operator genGroupByPlan1MR(String dest, QB qb,
+ Operator input) throws SemanticException {
- OperatorInfo inputOperatorInfo = input;
QBParseInfo parseInfo = qb.getParseInfo();
// ////// 1. Generate ReduceSinkOperator
- OperatorInfo reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(
- parseInfo, dest, inputOperatorInfo,
- getGroupByForClause(parseInfo, dest).size());
+ Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(
+ qb, dest, input, getGroupByForClause(parseInfo, dest).size());
// ////// 2. Generate GroupbyOperator
- OperatorInfo groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
+ Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
dest, reduceSinkOperatorInfo, groupByDesc.Mode.COMPLETE);
return groupByOperatorInfo;
@@ -1345,10 +1704,9 @@
* @throws SemanticException
*/
@SuppressWarnings("nls")
- private OperatorInfo genGroupByPlan2MR(String dest, QB qb,
- OperatorInfo input) throws SemanticException {
+ private Operator genGroupByPlan2MR(String dest, QB qb,
+ Operator input) throws SemanticException {
- OperatorInfo inputOperatorInfo = input;
QBParseInfo parseInfo = qb.getParseInfo();
// ////// 1. Generate ReduceSinkOperator
@@ -1356,59 +1714,78 @@
// reducers for load balancing problem. That happens when there is no DISTINCT
// operator. We set the numPartitionColumns to -1 for this purpose. This is
// captured by WritableComparableHiveObject.hashCode() function.
- OperatorInfo reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(
- parseInfo, dest, inputOperatorInfo, (parseInfo
- .getDistinctFuncExprForClause(dest) == null ? -1
+ Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(
+ qb, dest, input, (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1
: Integer.MAX_VALUE));
// ////// 2. Generate GroupbyOperator
- OperatorInfo groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
+ Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
dest, reduceSinkOperatorInfo, groupByDesc.Mode.PARTIAL1);
// ////// 3. Generate ReduceSinkOperator2
- OperatorInfo reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
+ Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
parseInfo, dest, groupByOperatorInfo,
getGroupByForClause(parseInfo, dest).size());
// ////// 4. Generate GroupbyOperator2
- OperatorInfo groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(
- parseInfo, dest, reduceSinkOperatorInfo2);
+ Operator groupByOperatorInfo2 =
+ genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL);
return groupByOperatorInfo2;
}
+ private boolean optimizeMapAggrGroupBy(String dest, QB qb) {
+ List<CommonTree> grpByExprs = getGroupByForClause(qb.getParseInfo(), dest);
+ if ((grpByExprs != null) && !grpByExprs.isEmpty())
+ return false;
+
+ if (qb.getParseInfo().getDistinctFuncExprForClause(dest) != null)
+ return false;
+
+ return true;
+ }
+
/**
- * Generate a Group-By plan using a 2 map-reduce jobs. The first map-reduce
- * job has already been constructed. Evaluate partial aggregates first,
- * followed by actual aggregates. The first map-reduce stage will be
- * shared by all groupbys.
+ * Generate a Group-By plan using a 2 map-reduce jobs. First perform a map
+ * side partial aggregation (to reduce the amount of data). Then spray by
+ * the distinct key (or a random number) in hope of getting a uniform
+ * distribution, and compute partial aggregates grouped by that distinct key.
+ * Evaluate partial aggregates first, followed by actual aggregates.
*/
@SuppressWarnings("nls")
- private OperatorInfo genGroupByPlan3MR(String dest, QB qb,
- OperatorInfo input) throws SemanticException {
+ private Operator genGroupByPlan4MR(String dest, QB qb,
+ Operator inputOperatorInfo) throws SemanticException {
- OperatorInfo inputOperatorInfo = input;
QBParseInfo parseInfo = qb.getParseInfo();
- // ////// Generate GroupbyOperator
- OperatorInfo groupByOperatorInfo = genGroupByPlanGroupByOpForward(parseInfo,
- dest, inputOperatorInfo, groupByDesc.Mode.PARTIAL1);
-
- // ////// Generate ReduceSinkOperator2
- OperatorInfo reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
- parseInfo, dest, groupByOperatorInfo,
- getGroupByForClause(parseInfo, dest).size());
-
- // ////// Generate GroupbyOperator2
- OperatorInfo groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(
- parseInfo, dest, reduceSinkOperatorInfo2);
+ // ////// Generate GroupbyOperator for a map-side partial aggregation
+ Operator groupByOperatorInfo = genGroupByPlanMapGroupByOperator(qb,
+ dest, inputOperatorInfo, groupByDesc.Mode.HASH);
+
+ // ////// Generate ReduceSink Operator
+ Operator reduceSinkOperatorInfo =
+ genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo);
+
+ // Optimize the scenario when there are no grouping keys and no distinct - 2 map-reduce jobs are not needed
+ if (!optimizeMapAggrGroupBy(dest, qb)) {
+ // ////// Generate GroupbyOperator for a partial aggregation
+ Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, dest, reduceSinkOperatorInfo,
+ groupByDesc.Mode.PARTIAL2);
+
+ // ////// Generate ReduceSinkOperator2
+ Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(parseInfo, dest, groupByOperatorInfo2,
+ getGroupByForClause(parseInfo, dest).size());
- return groupByOperatorInfo2;
+ // ////// Generate GroupbyOperator3
+ return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL);
+ }
+ else
+ return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.FINAL);
}
@SuppressWarnings("nls")
- private OperatorInfo genConversionOps(String dest, QB qb,
- OperatorInfo input) throws SemanticException {
+ private Operator genConversionOps(String dest, QB qb,
+ Operator input) throws SemanticException {
Integer dest_type = qb.getMetaData().getDestTypeForAlias(dest);
Table dest_tab = null;
@@ -1433,9 +1810,10 @@
}
@SuppressWarnings("nls")
- private OperatorInfo genFileSinkPlan(String dest, QB qb,
- OperatorInfo input) throws SemanticException {
+ private Operator genFileSinkPlan(String dest, QB qb,
+ Operator input) throws SemanticException {
+ RowResolver inputRR = opParseCtx.get(input).getRR();
// Generate the destination file
String queryTmpdir = this.scratchDir + File.separator + this.randomid + '.' + this.pathid + '.' + dest ;
this.pathid ++;
@@ -1471,10 +1849,8 @@
}
case QBMetaData.DEST_LOCAL_FILE:
case QBMetaData.DEST_DFS_FILE: {
- table_desc = Utilities.defaultTd;
dest_path = qb.getMetaData().getDestFileForAlias(dest);
String cols = new String();
- RowResolver inputRR = input.getRowResolver();
Vector<ColumnInfo> colInfos = inputRR.getColumnInfos();
boolean first = true;
@@ -1492,54 +1868,129 @@
this.loadFileWork.add(new loadFileDesc(queryTmpdir, dest_path,
(dest_type.intValue() == QBMetaData.DEST_DFS_FILE), cols));
+ table_desc = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode),
+ cols);
break;
}
default:
throw new SemanticException("Unknown destination type: " + dest_type);
}
- OperatorInfo output = (OperatorInfo)input.clone();
- output.setOp(
+ input = genConversionSelectOperator(dest, qb, input, table_desc);
+
+ Operator output = putOpInsertMap(
OperatorFactory.getAndMakeChild(
new fileSinkDesc(queryTmpdir, table_desc),
- new RowSchema(output.getRowResolver().getColumnInfos()), input.getOp()
- )
- );
+ new RowSchema(inputRR.getColumnInfos()), input), inputRR);
LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
+ dest_path + " row schema: "
- + output.getRowResolver().toString());
+ + inputRR.toString());
return output;
}
+ /**
+ * Generate the conversion SelectOperator that converts the columns into
+ * the types that are expected by the table_desc.
+ */
+ Operator genConversionSelectOperator(String dest, QB qb,
+ Operator input, tableDesc table_desc) throws SemanticException {
+ StructObjectInspector oi = null;
+ try {
+ Deserializer deserializer = table_desc.getDeserializerClass().newInstance();
+ deserializer.initialize(null, table_desc.getProperties());
+ oi = (StructObjectInspector) deserializer.getObjectInspector();
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+
+ // Check column number
+ List<? extends StructField> tableFields = oi.getAllStructFieldRefs();
+ Vector<ColumnInfo> rowFields = opParseCtx.get(input).getRR().getColumnInfos();
+ if (tableFields.size() != rowFields.size()) {
+ String reason = "Table " + dest + " has " + tableFields.size() + " columns but query has "
+ + rowFields + ".";
+ throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
+ qb.getParseInfo().getDestForClause(dest), reason));
+ }
+
+ // Check column types
+ boolean converted = false;
+ int columnNumber = tableFields.size();
+ ArrayList<exprNodeDesc> expressions = new ArrayList<exprNodeDesc>(columnNumber);
+ // MetadataTypedColumnsetSerDe does not need type conversions because it does
+ // the conversion to String by itself.
+ if (! table_desc.getDeserializerClass().equals(MetadataTypedColumnsetSerDe.class)) {
+ for (int i=0; i<columnNumber; i++) {
+ ObjectInspector tableFieldOI = tableFields.get(i).getFieldObjectInspector();
+ TypeInfo tableFieldTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(tableFieldOI);
+ TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
+ exprNodeDesc column = new exprNodeColumnDesc(rowFieldTypeInfo, Integer.valueOf(i).toString());
+ if (! tableFieldTypeInfo.equals(rowFieldTypeInfo)) {
+ // need to do some conversions here
+ converted = true;
+ if (tableFieldTypeInfo.getCategory() != Category.PRIMITIVE) {
+ // cannot convert to complex types
+ column = null;
+ } else {
+ column = getFuncExprNodeDesc(tableFieldTypeInfo.getPrimitiveClass().getName(), column);
+ }
+ if (column == null) {
+ String reason = "Cannot convert column " + i + " from " + rowFieldTypeInfo + " to "
+ + tableFieldTypeInfo + ".";
+ throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
+ qb.getParseInfo().getDestForClause(dest), reason));
+ }
+ } else {
+ expressions.add(column);
+ }
+ }
+ }
+
+ if (converted) {
+ // add the select operator
+ RowResolver rowResolver = new RowResolver();
+ for (int i=0; i<expressions.size(); i++) {
+ String name = Integer.valueOf(i).toString();
+ rowResolver.put("", name, new ColumnInfo(name, expressions.get(i).getTypeInfo()));
+ }
+ Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new selectDesc(expressions), new RowSchema(rowResolver.getColumnInfos()), input), rowResolver);
+
+ return output;
+ } else {
+ // not converted
+ return input;
+ }
+ }
+
@SuppressWarnings("nls")
- private OperatorInfo genLimitPlan(String dest, QB qb, OperatorInfo input, int limit) throws SemanticException {
+ private Operator genLimitPlan(String dest, QB qb, Operator input, int limit) throws SemanticException {
// A map-only job can be optimized - instead of converting it to a map-reduce job, we can have another map
// job to do the same to avoid the cost of sorting in the map-reduce phase. A better approach would be to
// write into a local file and then have a map-only job.
// Add the limit operator to get the value fields
- OperatorInfo limitMap = (OperatorInfo)input.clone();
- limitMap.setOp(
- OperatorFactory.getAndMakeChild(
- new limitDesc(limit), new RowSchema(limitMap.getRowResolver().getColumnInfos()),
- input.getOp()
- )
- );
+ RowResolver inputRR = opParseCtx.get(input).getRR();
+ Operator limitMap =
+ putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new limitDesc(limit), new RowSchema(inputRR.getColumnInfos()), input),
+ inputRR);
+
LOG.debug("Created LimitOperator Plan for clause: " + dest + " row schema: "
- + limitMap.getRowResolver().toString());
+ + inputRR.toString());
return limitMap;
}
@SuppressWarnings("nls")
- private OperatorInfo genLimitMapRedPlan(String dest, QB qb, OperatorInfo input, int limit, boolean isOuterQuery) throws SemanticException {
+ private Operator genLimitMapRedPlan(String dest, QB qb, Operator input, int limit, boolean isOuterQuery) throws SemanticException {
// A map-only job can be optimized - instead of converting it to a map-reduce job, we can have another map
// job to do the same to avoid the cost of sorting in the map-reduce phase. A better approach would be to
// write into a local file and then have a map-only job.
// Add the limit operator to get the value fields
- OperatorInfo curr = genLimitPlan(dest, qb, input, limit);
+ Operator curr = genLimitPlan(dest, qb, input, limit);
if (isOuterQuery)
return curr;
@@ -1550,20 +2001,21 @@
}
@SuppressWarnings("nls")
- private OperatorInfo genReduceSinkPlan(String dest, QB qb,
- OperatorInfo input, int numReducers) throws SemanticException {
+ private Operator genReduceSinkPlan(String dest, QB qb,
+ Operator input, int numReducers) throws SemanticException {
// First generate the expression for the key
// The cluster by clause has the aliases for the keys
ArrayList<exprNodeDesc> keyCols = new ArrayList<exprNodeDesc>();
-
+ RowResolver inputRR = opParseCtx.get(input).getRR();
+
CommonTree clby = qb.getParseInfo().getClusterByForClause(dest);
if (clby != null) {
int ccount = clby.getChildCount();
for(int i=0; i<ccount; ++i) {
CommonTree cl = (CommonTree)clby.getChild(i);
- ColumnInfo colInfo = input.getRowResolver().get(qb.getParseInfo().getAlias(),
- cl.getText());
+ ColumnInfo colInfo = inputRR.get(qb.getParseInfo().getAlias(),
+ unescapeIdentifier(cl.getText()));
if (colInfo == null) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(cl));
}
@@ -1576,22 +2028,19 @@
// For the generation of the values expression just get the inputs
// signature and generate field expressions for those
- for(ColumnInfo colInfo: input.getRowResolver().getColumnInfos()) {
+ for(ColumnInfo colInfo: inputRR.getColumnInfos()) {
valueCols.add(new exprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName()));
}
- OperatorInfo interim = (OperatorInfo)input.clone();
- interim.setOp(
+ Operator interim = putOpInsertMap(
OperatorFactory.getAndMakeChild(
PlanUtils.getReduceSinkDesc(keyCols, valueCols, -1, keyCols.size(), numReducers, false),
- new RowSchema(interim.getRowResolver().getColumnInfos()),
- input.getOp()
- )
- );
+ new RowSchema(inputRR.getColumnInfos()),
+ input), inputRR);
// Add the extract operator to get the value fields
RowResolver out_rwsch = new RowResolver();
- RowResolver interim_rwsch = interim.getRowResolver();
+ RowResolver interim_rwsch = inputRR;
Integer pos = Integer.valueOf(0);
for(ColumnInfo colInfo: interim_rwsch.getColumnInfos()) {
String [] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
@@ -1600,23 +2049,19 @@
pos = Integer.valueOf(pos.intValue() + 1);
}
- OperatorInfo output = (OperatorInfo)interim.clone();
- output.setOp(
+ Operator output = putOpInsertMap(
OperatorFactory.getAndMakeChild(
new extractDesc(new exprNodeColumnDesc(String.class, Utilities.ReduceField.VALUE.toString())),
new RowSchema(out_rwsch.getColumnInfos()),
- interim.getOp()
- )
- );
- output.setRowResolver(out_rwsch);
+ interim), out_rwsch);
LOG.debug("Created ReduceSink Plan for clause: " + dest + " row schema: "
- + output.getRowResolver().toString());
+ + out_rwsch.toString());
return output;
}
- private OperatorInfo genJoinOperatorChildren(QBJoinTree join, OperatorInfo left,
- OperatorInfo[] right) {
+ private Operator genJoinOperatorChildren(QBJoinTree join, Operator left, Operator[] right)
+ throws SemanticException {
RowResolver outputRS = new RowResolver();
// all children are base classes
Operator<?>[] rightOps = new Operator[right.length];
@@ -1625,13 +2070,13 @@
HashMap<Byte, ArrayList<exprNodeDesc>> exprMap = new HashMap<Byte, ArrayList<exprNodeDesc>>();
- for (OperatorInfo input : right)
+ for (Operator input : right)
{
ArrayList<exprNodeDesc> keyDesc = new ArrayList<exprNodeDesc>();
if (input == null)
input = left;
- Byte tag = Byte.valueOf((byte)(((reduceSinkDesc)(input.getOp().getConf())).getTag()));
- RowResolver inputRS = input.getRowResolver();
+ Byte tag = Byte.valueOf((byte)(((reduceSinkDesc)(input.getConf())).getTag()));
+ RowResolver inputRS = opParseCtx.get(input).getRR();
Iterator<String> keysIter = inputRS.getTableNames().iterator();
while (keysIter.hasNext())
{
@@ -1650,7 +2095,7 @@
}
exprMap.put(tag, keyDesc);
- rightOps[pos++] = input.getOp();
+ rightOps[pos++] = input;
}
org.apache.hadoop.hive.ql.plan.joinCond[] joinCondns = new org.apache.hadoop.hive.ql.plan.joinCond[join.getJoinCond().length];
@@ -1659,14 +2104,15 @@
joinCondns[i] = new org.apache.hadoop.hive.ql.plan.joinCond(condn);
}
- return new OperatorInfo(OperatorFactory.getAndMakeChild(new joinDesc(exprMap, joinCondns),
- new RowSchema(outputRS.getColumnInfos()), rightOps), outputRS);
+ return putOpInsertMap(
+ OperatorFactory.getAndMakeChild(new joinDesc(exprMap, joinCondns),
+ new RowSchema(outputRS.getColumnInfos()), rightOps), outputRS);
}
@SuppressWarnings("nls")
- private OperatorInfo genJoinReduceSinkChild(QB qb, QBJoinTree joinTree,
- OperatorInfo child, String srcName, int pos) throws SemanticException {
- RowResolver inputRS = child.getRowResolver();
+ private Operator genJoinReduceSinkChild(QB qb, QBJoinTree joinTree,
+ Operator child, String srcName, int pos) throws SemanticException {
+ RowResolver inputRS = opParseCtx.get(child).getRR();
RowResolver outputRS = new RowResolver();
ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
@@ -1674,7 +2120,7 @@
Vector<CommonTree> exprs = joinTree.getExpressions().get(pos);
for (int i = 0; i < exprs.size(); i++) {
CommonTree expr = exprs.get(i);
- reduceKeys.add(genExprNodeDesc(expr, srcName, inputRS));
+ reduceKeys.add(genExprNodeDesc(qb.getMetaData(), expr, inputRS));
}
// Walk over the input row resolver and copy in the output
@@ -1696,28 +2142,32 @@
}
}
- return new OperatorInfo(
+ return putOpInsertMap(
OperatorFactory.getAndMakeChild(
PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, joinTree.getNextTag(), reduceKeys.size(), -1, false),
new RowSchema(outputRS.getColumnInfos()),
- child.getOp()), outputRS);
+ child), outputRS);
}
- private OperatorInfo genJoinOperator(QB qb, QBJoinTree joinTree,
- HashMap<String, OperatorInfo> map) throws SemanticException {
+ private Operator genJoinOperator(QB qb, QBJoinTree joinTree,
+ HashMap<String, Operator> map) throws SemanticException {
QBJoinTree leftChild = joinTree.getJoinSrc();
- OperatorInfo joinSrcOp = null;
+ Operator joinSrcOp = null;
if (leftChild != null)
{
- OperatorInfo joinOp = genJoinOperator(qb, leftChild, map);
+ Operator joinOp = genJoinOperator(qb, leftChild, map);
+ Vector<CommonTree> filter = joinTree.getFilters().get(0);
+ for (CommonTree cond: filter)
+ joinOp = genFilterPlan(qb, cond, joinOp);
+
joinSrcOp = genJoinReduceSinkChild(qb, joinTree, joinOp, null, 0);
}
- OperatorInfo[] srcOps = new OperatorInfo[joinTree.getBaseSrc().length];
+ Operator[] srcOps = new Operator[joinTree.getBaseSrc().length];
int pos = 0;
for (String src : joinTree.getBaseSrc()) {
if (src != null) {
- OperatorInfo srcOp = map.get(src);
+ Operator srcOp = map.get(src);
srcOps[pos] = genJoinReduceSinkChild(qb, joinTree, srcOp, src, pos);
pos++;
} else {
@@ -1732,13 +2182,13 @@
return genJoinOperatorChildren(joinTree, joinSrcOp, srcOps);
}
- private void genJoinOperatorTypeCheck(OperatorInfo left, OperatorInfo[] right) throws SemanticException {
+ private void genJoinOperatorTypeCheck(Operator left, Operator[] right) throws SemanticException {
// keys[i] -> ArrayList<exprNodeDesc> for the i-th join operator key list
ArrayList<ArrayList<exprNodeDesc>> keys = new ArrayList<ArrayList<exprNodeDesc>>();
int keyLength = 0;
for (int i=0; i<right.length; i++) {
- OperatorInfo oi = (i==0 && right[i] == null ? left : right[i]);
- reduceSinkDesc now = ((ReduceSinkOperator)(oi.getOp())).getConf();
+ Operator oi = (i==0 && right[i] == null ? left : right[i]);
+ reduceSinkDesc now = ((ReduceSinkOperator)(oi)).getConf();
if (i == 0) {
keyLength = now.getKeyCols().size();
} else {
@@ -1765,15 +2215,45 @@
}
}
}
+ // regenerate keySerializationInfo because the ReduceSinkOperator's
+ // output key types might have changed.
+ for (int i=0; i<right.length; i++) {
+ Operator oi = (i==0 && right[i] == null ? left : right[i]);
+ reduceSinkDesc now = ((ReduceSinkOperator)(oi)).getConf();
+ now.setKeySerializeInfo(PlanUtils.getBinarySortableTableDesc(
+ PlanUtils.getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey")));
+ }
}
- private OperatorInfo genJoinPlan(QB qb, HashMap<String, OperatorInfo> map)
+ private Operator genJoinPlan(QB qb, HashMap<String, Operator> map)
throws SemanticException {
QBJoinTree joinTree = qb.getQbJoinTree();
- OperatorInfo joinOp = genJoinOperator(qb, joinTree, map);
+ Operator joinOp = genJoinOperator(qb, joinTree, map);
return joinOp;
}
+ /**
+ * Extract the filters from the join condition and push them on top of the source operators. This procedure
+ * traverses the query tree recursively,
+ */
+ private void pushJoinFilters(QB qb, QBJoinTree joinTree, HashMap<String, Operator> map) throws SemanticException {
+ Vector<Vector<CommonTree>> filters = joinTree.getFilters();
+ if (joinTree.getJoinSrc() != null)
+ pushJoinFilters(qb, joinTree.getJoinSrc(), map);
+
+ int pos = 0;
+ for (String src : joinTree.getBaseSrc()) {
+ if (src != null) {
+ Operator srcOp = map.get(src);
+ Vector<CommonTree> filter = filters.get(pos);
+ for (CommonTree cond: filter)
+ srcOp = genFilterPlan(qb, cond, srcOp);
+ map.put(src, srcOp);
+ }
+ pos++;
+ }
+ }
+
private QBJoinTree genJoinTree(CommonTree joinParseTree)
throws SemanticException {
QBJoinTree joinTree = new QBJoinTree();
@@ -1807,9 +2287,9 @@
if ((left.getToken().getType() == HiveParser.TOK_TABREF)
|| (left.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
- String table_name = left.getChild(0).getText();
- String alias = left.getChildCount() == 1 ? table_name : left.getChild(1)
- .getText();
+ String table_name = unescapeIdentifier(left.getChild(0).getText());
+ String alias = left.getChildCount() == 1 ? table_name :
+ unescapeIdentifier(left.getChild(1).getText().toLowerCase());
joinTree.setLeftAlias(alias);
String[] leftAliases = new String[1];
leftAliases[0] = alias;
@@ -1832,9 +2312,9 @@
if ((right.getToken().getType() == HiveParser.TOK_TABREF)
|| (right.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
- String table_name = right.getChild(0).getText();
- String alias = right.getChildCount() == 1 ? table_name : right.getChild(1)
- .getText();
[... 757 lines stripped ...]