You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/24 20:02:52 UTC
[5/6] hive git commit: HIVE-15269: Dynamic Min-Max/BloomFilter
runtime-filtering for Tez (Deepak Jaiswal via Jason Dere)
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java
index d9ce017..b8a60f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Stack;
import org.apache.calcite.util.Pair;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -75,16 +76,19 @@ public class RedundantDynamicPruningConditionsRemoval extends Transform {
*/
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%" +
- FilterOperator.getOperatorName() + "%"), new FilterTransformer());
-
- Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
- GraphWalker ogw = new DefaultGraphWalker(disp);
-
- List<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pctx.getTopOps().values());
- ogw.startWalking(topNodes, null);
+ // Make sure semijoin is not enabled. If it is, then do not remove the dynamic partition pruning predicates.
+ if (!pctx.getConf().getBoolVar(HiveConf.ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION)) {
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%" +
+ FilterOperator.getOperatorName() + "%"), new FilterTransformer());
+
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ }
return pctx;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
index aa1e509..61f1374 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
@@ -51,41 +51,10 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.AggregationDesc;
-import org.apache.hadoop.hive.ql.plan.ColStatistics;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnListDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
-import org.apache.hadoop.hive.ql.plan.JoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.Statistics;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualNS;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
+import org.apache.hadoop.hive.ql.udf.generic.*;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
@@ -494,6 +463,12 @@ public class StatsRulesProcFactory {
final ExprNodeDesc leftExpression = fd.getChildren().get(2); // left expression
final ExprNodeDesc rightExpression = fd.getChildren().get(3); // right expression
+ // Short circuit and return the current number of rows if this is a
+ // synthetic predicate with dynamic values
+ if (leftExpression instanceof ExprNodeDynamicValueDesc) {
+ return stats.getNumRows();
+ }
+
// We transform the BETWEEN clause to AND clause (with NOT on top in invert is true).
// This is more straightforward, as the evaluateExpression method will deal with
// generating the final row count relying on the basic comparator evaluation methods
@@ -888,18 +863,29 @@ public class StatsRulesProcFactory {
} else if (udf instanceof GenericUDFOPNotEqual) {
return numRows;
} else if (udf instanceof GenericUDFOPEqualOrGreaterThan
- || udf instanceof GenericUDFOPEqualOrLessThan
- || udf instanceof GenericUDFOPGreaterThan
- || udf instanceof GenericUDFOPLessThan) {
+ || udf instanceof GenericUDFOPEqualOrLessThan
+ || udf instanceof GenericUDFOPGreaterThan
+ || udf instanceof GenericUDFOPLessThan) {
return evaluateComparator(stats, genFunc);
} else if (udf instanceof GenericUDFOPNotNull) {
- return evaluateNotNullExpr(stats, genFunc);
+ return evaluateNotNullExpr(stats, genFunc);
} else if (udf instanceof GenericUDFOPNull) {
return evaluateColEqualsNullExpr(stats, genFunc);
} else if (udf instanceof GenericUDFOPAnd || udf instanceof GenericUDFOPOr
- || udf instanceof GenericUDFIn || udf instanceof GenericUDFBetween
- || udf instanceof GenericUDFOPNot) {
+ || udf instanceof GenericUDFIn || udf instanceof GenericUDFBetween
+ || udf instanceof GenericUDFOPNot) {
return evaluateExpression(stats, genFunc, aspCtx, neededCols, fop, evaluatedRowCount);
+ } else if (udf instanceof GenericUDFInBloomFilter) {
+ if (genFunc.getChildren().get(1) instanceof ExprNodeDynamicValueDesc) {
+ // Synthetic predicates from semijoin opt should not affect stats.
+ return numRows;
+ }
+ }
+ } else if (child instanceof ExprNodeConstantDesc) {
+ if (Boolean.FALSE.equals(((ExprNodeConstantDesc) child).getValue())) {
+ return 0;
+ } else {
+ return stats.getNumRows();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 3a111aa..6141391 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -20,13 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,18 +36,13 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.*;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,6 +258,15 @@ public class GenTezUtils {
((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot);
}
}
+ // This TableScanOperator could be part of semijoin optimization.
+ Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap =
+ context.parseContext.getRsOpToTsOpMap();
+ for (ReduceSinkOperator rs : rsOpToTsOpMap.keySet()) {
+ if (rsOpToTsOpMap.get(rs) == orig) {
+ rsOpToTsOpMap.put(rs, (TableScanOperator) newRoot);
+ break;
+ }
+ }
}
context.rootToWorkMap.remove(orig);
context.rootToWorkMap.put(newRoot, work);
@@ -479,7 +477,7 @@ public class GenTezUtils {
* Remove an operator branch. When we see a fork, we know it's time to do the removal.
* @param event the leaf node of which branch to be removed
*/
- public static void removeBranch(AppMasterEventOperator event) {
+ public static void removeBranch(Operator<?> event) {
Operator<?> child = event;
Operator<?> curr = event;
@@ -511,4 +509,140 @@ public class GenTezUtils {
}
return EdgeType.SIMPLE_EDGE;
}
+
+ public static void processDynamicMinMaxPushDownOperator(
+ GenTezProcContext procCtx, RuntimeValuesInfo runtimeValuesInfo,
+ ReduceSinkOperator rs)
+ throws SemanticException {
+ TableScanOperator ts = procCtx.parseContext.getRsOpToTsOpMap().get(rs);
+
+ List<BaseWork> rsWorkList = procCtx.childToWorkMap.get(rs);
+ if (ts == null || rsWorkList == null) {
+ // This happens when the ReduceSink's edge has been removed by cycle
+ // detection logic. Nothing to do here.
+ return;
+ }
+ LOG.debug("ResduceSink " + rs + " to TableScan " + ts);
+
+ if (rsWorkList.size() != 1) {
+ StringBuilder sb = new StringBuilder();
+ for (BaseWork curWork : rsWorkList) {
+ if ( sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append(curWork.getName());
+ }
+ throw new SemanticException(rs + " belongs to multiple BaseWorks: " + sb.toString());
+ }
+
+ BaseWork parentWork = rsWorkList.get(0);
+ BaseWork childWork = procCtx.rootToWorkMap.get(ts);
+
+ // Connect parent/child work with a brodacast edge.
+ LOG.debug("Connecting Baswork - " + parentWork.getName() + " to " + childWork.getName());
+ TezEdgeProperty edgeProperty = new TezEdgeProperty(EdgeType.BROADCAST_EDGE);
+ TezWork tezWork = procCtx.currentTask.getWork();
+ tezWork.connect(parentWork, childWork, edgeProperty);
+
+ // Set output names in ReduceSink
+ rs.getConf().setOutputName(childWork.getName());
+
+ // Set up the dynamic values in the childWork.
+ RuntimeValuesInfo childRuntimeValuesInfo =
+ new RuntimeValuesInfo();
+ childRuntimeValuesInfo.setTableDesc(runtimeValuesInfo.getTableDesc());
+ childRuntimeValuesInfo.setDynamicValueIDs(runtimeValuesInfo.getDynamicValueIDs());
+ childRuntimeValuesInfo.setColExprs(runtimeValuesInfo.getColExprs());
+ childWork.setInputSourceToRuntimeValuesInfo(
+ parentWork.getName(), childRuntimeValuesInfo);
+ }
+
+ // Functionality to remove semi-join optimization
+ public static void removeSemiJoinOperator(ParseContext context,
+ ReduceSinkOperator rs,
+ TableScanOperator ts) throws SemanticException{
+ // Cleanup the synthetic predicate in the tablescan operator by
+ // replacing it with "true"
+ LOG.debug("Removing ReduceSink " + rs + " and TableScan " + ts);
+ ExprNodeDesc constNode = new ExprNodeConstantDesc(
+ TypeInfoFactory.booleanTypeInfo, Boolean.TRUE);
+ DynamicValuePredicateContext filterDynamicValuePredicatesCollection =
+ new DynamicValuePredicateContext();
+ collectDynamicValuePredicates(ts.getConf().getFilterExpr(),
+ filterDynamicValuePredicatesCollection);
+ for (ExprNodeDesc nodeToRemove : filterDynamicValuePredicatesCollection
+ .childParentMapping.keySet()) {
+ // Find out if this synthetic predicate belongs to the current cycle
+ boolean skip = true;
+ for (ExprNodeDesc expr : nodeToRemove.getChildren()) {
+ if (expr instanceof ExprNodeDynamicValueDesc ) {
+ String dynamicValueIdFromExpr = ((ExprNodeDynamicValueDesc) expr)
+ .getDynamicValue().getId();
+ List<String> dynamicValueIdsFromMap = context.
+ getRsToRuntimeValuesInfoMap().get(rs).getDynamicValueIDs();
+ for (String dynamicValueIdFromMap : dynamicValueIdsFromMap) {
+ if (dynamicValueIdFromExpr.equals(dynamicValueIdFromMap)) {
+ // Intended predicate to be removed
+ skip = false;
+ break;
+ }
+ }
+ }
+ }
+ if (!skip) {
+ ExprNodeDesc nodeParent = filterDynamicValuePredicatesCollection
+ .childParentMapping.get(nodeToRemove);
+ if (nodeParent == null) {
+ // This was the only predicate, set filter expression to null
+ ts.getConf().setFilterExpr(null);
+ } else {
+ int i = nodeParent.getChildren().indexOf(nodeToRemove);
+ nodeParent.getChildren().remove(i);
+ nodeParent.getChildren().add(i, constNode);
+ }
+ // skip the rest of the predicates
+ skip = true;
+ }
+ }
+ context.getRsOpToTsOpMap().remove(rs);
+ }
+
+ private static class DynamicValuePredicateContext implements NodeProcessorCtx {
+ HashMap<ExprNodeDesc, ExprNodeDesc> childParentMapping = new HashMap<ExprNodeDesc, ExprNodeDesc>();
+ }
+
+ private static class DynamicValuePredicateProc implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ DynamicValuePredicateContext ctx = (DynamicValuePredicateContext) procCtx;
+ ExprNodeDesc parent = (ExprNodeDesc) stack.get(stack.size() - 2);
+ if (parent instanceof ExprNodeGenericFuncDesc) {
+ ExprNodeGenericFuncDesc parentFunc = (ExprNodeGenericFuncDesc) parent;
+ if (parentFunc.getGenericUDF() instanceof GenericUDFBetween ||
+ parentFunc.getGenericUDF() instanceof GenericUDFInBloomFilter) {
+ ExprNodeDesc grandParent = stack.size() >= 3 ?
+ (ExprNodeDesc) stack.get(stack.size() - 3) : null;
+ ctx.childParentMapping.put(parentFunc, grandParent);
+ }
+ }
+
+ return null;
+ }
+ }
+
+ private static void collectDynamicValuePredicates(ExprNodeDesc pred, NodeProcessorCtx ctx) throws SemanticException {
+ // create a walker which walks the tree in a DFS manner while maintaining
+ // the operator stack. The dispatcher
+ // generates the plan from the operator tree
+ Map<Rule, NodeProcessor> exprRules = new LinkedHashMap<Rule, NodeProcessor>();
+ exprRules.put(new RuleRegExp("R1", ExprNodeDynamicValueDesc.class.getName() + "%"), new DynamicValuePredicateProc());
+ Dispatcher disp = new DefaultRuleDispatcher(null, exprRules, ctx);
+ GraphWalker egw = new DefaultGraphWalker(disp);
+ List<Node> startNodes = new ArrayList<Node>();
+ startNodes.add(pred);
+
+ egw.startWalking(startNodes, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 35f34da..3f9f76c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
import org.apache.hadoop.hive.ql.plan.CreateViewDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -125,6 +126,11 @@ public class ParseContext {
private boolean needViewColumnAuthorization;
private Set<FileSinkDesc> acidFileSinks = Collections.emptySet();
+ // Map to store mapping between reduce sink Operator and TS Operator for semijoin
+ private Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap =
+ new HashMap<ReduceSinkOperator, TableScanOperator>();
+ private Map<ReduceSinkOperator, RuntimeValuesInfo> rsToRuntimeValuesInfo =
+ new HashMap<ReduceSinkOperator, RuntimeValuesInfo>();
public ParseContext() {
}
@@ -652,4 +658,19 @@ public class ParseContext {
}
}
+ public void setRsToRuntimeValuesInfoMap(Map<ReduceSinkOperator, RuntimeValuesInfo> rsToRuntimeValuesInfo) {
+ this.rsToRuntimeValuesInfo = rsToRuntimeValuesInfo;
+ }
+
+ public Map<ReduceSinkOperator, RuntimeValuesInfo> getRsToRuntimeValuesInfoMap() {
+ return rsToRuntimeValuesInfo;
+ }
+
+ public void setRsOpToTsOpMap(Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap) {
+ this.rsOpToTsOpMap = rsOpToTsOpMap;
+ }
+
+ public Map<ReduceSinkOperator, TableScanOperator> getRsOpToTsOpMap() {
+ return rsOpToTsOpMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/RuntimeValuesInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RuntimeValuesInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RuntimeValuesInfo.java
new file mode 100644
index 0000000..5865f1a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RuntimeValuesInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Holds structures required for runtime values and mappings.
+ */
+public class RuntimeValuesInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private TableDesc tableDesc;
+ private List<String> dynamicValueIDs;
+ private List<ExprNodeDesc> colExprs;
+
+ // get-set methods
+ public TableDesc getTableDesc() {
+ return tableDesc;
+ }
+
+ public void setTableDesc(TableDesc tableDesc) {
+ this.tableDesc = tableDesc;
+ }
+
+ public List<String> getDynamicValueIDs() {
+ return dynamicValueIDs;
+ }
+
+ public void setDynamicValueIDs(List<String> dynamicValueIDs) {
+ this.dynamicValueIDs = dynamicValueIDs;
+ }
+
+ public List<ExprNodeDesc> getColExprs() {
+ return colExprs;
+ }
+
+ public void setColExprs(List<ExprNodeDesc> colExprs) {
+ this.colExprs = colExprs;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index e8b003e..5f9ccc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -530,6 +530,9 @@ public abstract class TaskCompiler {
clone.setFetchTask(pCtx.getFetchTask());
clone.setLineageInfo(pCtx.getLineageInfo());
clone.setMapJoinOps(pCtx.getMapJoinOps());
+ clone.setRsToRuntimeValuesInfoMap(pCtx.getRsToRuntimeValuesInfoMap());
+ clone.setRsOpToTsOpMap(pCtx.getRsOpToTsOpMap());
+
return clone;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index cdb9e1b..cf8e843 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -18,49 +18,22 @@
package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.lib.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
-import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
-import org.apache.hadoop.hive.ql.lib.Dispatcher;
-import org.apache.hadoop.hive.ql.lib.ForwardWalker;
-import org.apache.hadoop.hive.ql.lib.GraphWalker;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.Rule;
-import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
@@ -83,12 +56,6 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter;
import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -97,7 +64,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
*/
public class TezCompiler extends TaskCompiler {
- protected final Logger LOG = LoggerFactory.getLogger(TezCompiler.class);
+ protected static final Logger LOG = LoggerFactory.getLogger(TezCompiler.class);
public TezCompiler() {
}
@@ -140,6 +107,29 @@ public class TezCompiler extends TaskCompiler {
runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs);
perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run cycle analysis for partition pruning");
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+ // Remove semijoin optimization if it creates a cycle with mapside joins
+ removeSemiJoinCyclesDueToMapsideJoins(procCtx);
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if it creates a cycle with mapside join");
+
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+ // Remove semijoin optimization if SMB join is created.
+ removeSemijoinOptimizationFromSMBJoins(procCtx);
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if needed");
+
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+ // Remove bloomfilter if no stats generated
+ removeSemiJoinIfNoStats(procCtx);
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove bloom filter optimizations if needed");
+
+ // need a new run of the constant folding because we might have created lots
+ // of "and true and true" conditions.
+ // Rather than run the full constant folding just need to shortcut AND/OR expressions
+ // involving constant true/false values.
+ if(procCtx.conf.getBoolVar(ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
+ new ConstantPropagate(ConstantPropagateOption.SHORTCUT).transform(procCtx.parseContext);
+ }
+
}
private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx,
@@ -163,7 +153,7 @@ public class TezCompiler extends TaskCompiler {
if (component.size() != 1) {
LOG.info("Found cycle in operator plan...");
cycleFree = false;
- removeEventOperator(component, procCtx);
+ removeCycleOperator(component, procCtx);
break;
}
}
@@ -171,29 +161,72 @@ public class TezCompiler extends TaskCompiler {
}
}
- private void removeEventOperator(Set<Operator<?>> component, OptimizeTezProcContext context) {
- AppMasterEventOperator victim = null;
+ private void removeCycleOperator(Set<Operator<?>> component, OptimizeTezProcContext context) throws SemanticException {
+ AppMasterEventOperator victimAM = null;
+ TableScanOperator victimTS = null;
+ ReduceSinkOperator victimRS = null;
+
for (Operator<?> o : component) {
+ // Look for AppMasterEventOperator or ReduceSinkOperator
if (o instanceof AppMasterEventOperator) {
- if (victim == null
- || o.getConf().getStatistics().getDataSize() < victim.getConf().getStatistics()
+ if (victimAM == null
+ || o.getStatistics().getDataSize() < victimAM.getStatistics()
.getDataSize()) {
- victim = (AppMasterEventOperator) o;
+ victimAM = (AppMasterEventOperator) o;
}
+ } else if (o instanceof ReduceSinkOperator) {
+ TableScanOperator ts = context.parseContext.getRsOpToTsOpMap().get(o);
+ if (ts == null) {
+ continue;
+ }
+ // Sanity check
+ assert component.contains(ts);
+
+ if (victimRS == null ||
+ ts.getStatistics().getDataSize() <
+ victimTS.getStatistics().getDataSize()) {
+ victimRS = (ReduceSinkOperator) o;
+ victimTS = ts;
+ }
+ }
+ }
+
+ // Always set the min/max optimization as victim.
+ Operator<?> victim = victimRS;
+
+ if (victimRS == null && victimAM != null ) {
+ victim = victimAM;
+ } else if (victimAM == null) {
+ // do nothing
+ } else {
+ // Cycle consists of atleast one dynamic partition pruning(DPP)
+ // optimization and atleast one min/max optimization.
+ // DPP is a better optimization unless it ends up scanning the
+ // bigger table for keys instead of the smaller table.
+
+ // Get the parent TS of victimRS.
+ Operator<?> op = victimRS;
+ while(!(op instanceof TableScanOperator)) {
+ op = op.getParentOperators().get(0);
+ }
+ if ((2 * op.getStatistics().getDataSize()) <
+ victimAM.getStatistics().getDataSize()) {
+ victim = victimAM;
}
}
if (victim == null ||
- (!context.pruningOpsRemovedByPriorOpt.isEmpty() &&
- context.pruningOpsRemovedByPriorOpt.contains(victim))) {
+ (!context.pruningOpsRemovedByPriorOpt.isEmpty() &&
+ context.pruningOpsRemovedByPriorOpt.contains(victim))) {
return;
}
GenTezUtils.removeBranch(victim);
- // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
- LOG.info("Disabling dynamic pruning for: "
- + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
- + ". Needed to break cyclic dependency");
+
+ if (victim == victimRS) {
+ GenTezUtils.removeSemiJoinOperator(context.parseContext, victimRS, victimTS);
+ }
+ return;
}
// Tarjan's algo
@@ -205,11 +238,11 @@ public class TezCompiler extends TaskCompiler {
Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>();
Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>();
Stack<Operator<?>> nodes = new Stack<Operator<?>>();
- Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>();
+ Set<Set<Operator<?>>> components = new LinkedHashSet<Set<Operator<?>>>();
for (Operator<?> o : deque) {
if (!indexes.containsKey(o)) {
- connect(o, index, nodes, indexes, lowLinks, components);
+ connect(o, index, nodes, indexes, lowLinks, components, procCtx.parseContext);
}
}
@@ -218,7 +251,7 @@ public class TezCompiler extends TaskCompiler {
private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes,
Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks,
- Set<Set<Operator<?>>> components) {
+ Set<Set<Operator<?>>> components, ParseContext parseContext) {
indexes.put(o, index.get());
lowLinks.put(o, index.get());
@@ -232,13 +265,22 @@ public class TezCompiler extends TaskCompiler {
TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan();
LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
children.add(ts);
+ } else if (o instanceof ReduceSinkOperator){
+ // min/max case
+ children = new ArrayList<Operator<?>>();
+ children.addAll(o.getChildOperators());
+ TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(o);
+ if (ts != null) {
+ LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+ children.add(ts);
+ }
} else {
children = o.getChildOperators();
}
for (Operator<?> child : children) {
if (!indexes.containsKey(child)) {
- connect(child, index, nodes, indexes, lowLinks, components);
+ connect(child, index, nodes, indexes, lowLinks, components, parseContext);
lowLinks.put(o, Math.min(lowLinks.get(o), lowLinks.get(child)));
} else if (nodes.contains(child)) {
lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child)));
@@ -246,7 +288,7 @@ public class TezCompiler extends TaskCompiler {
}
if (lowLinks.get(o).equals(indexes.get(o))) {
- Set<Operator<?>> component = new HashSet<Operator<?>>();
+ Set<Operator<?>> component = new LinkedHashSet<Operator<?>>();
components.add(component);
Operator<?> current;
do {
@@ -315,14 +357,6 @@ public class TezCompiler extends TaskCompiler {
topNodes.addAll(procCtx.parseContext.getTopOps().values());
GraphWalker ogw = new ForwardWalker(disp);
ogw.startWalking(topNodes, null);
-
- // need a new run of the constant folding because we might have created lots
- // of "and true and true" conditions.
- // Rather than run the full constant folding just need to shortcut AND/OR expressions
- // involving constant true/false values.
- if(procCtx.conf.getBoolVar(ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
- new ConstantPropagate(ConstantPropagateOption.SHORTCUT).transform(procCtx.parseContext);
- }
}
@Override
@@ -401,6 +435,14 @@ public class TezCompiler extends TaskCompiler {
GenTezUtils.processFileSink(procCtx, fileSink);
}
+ // Connect any edges required for min/max pushdown
+ if (pCtx.getRsToRuntimeValuesInfoMap().size() > 0) {
+ for (ReduceSinkOperator rs : pCtx.getRsToRuntimeValuesInfoMap().keySet()) {
+ // Process min/max
+ GenTezUtils.processDynamicMinMaxPushDownOperator(
+ procCtx, pCtx.getRsToRuntimeValuesInfoMap().get(rs), rs);
+ }
+ }
// and finally we hook up any events that need to be sent to the tez AM
LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events.");
for (AppMasterEventOperator event : procCtx.eventOperatorSet) {
@@ -528,4 +570,256 @@ public class TezCompiler extends TaskCompiler {
perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "optimizeTaskPlan");
return;
}
+
+ private static class SemijoinRemovalContext implements NodeProcessorCtx {
+ List<Operator<?>> parents = new ArrayList<Operator<?>>();
+ }
+
+ private static class SemijoinRemovalProc implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ SemijoinRemovalContext ctx = (SemijoinRemovalContext) procCtx;
+ Operator<?> parent = (Operator<?>) stack.get(stack.size() - 2);
+ ctx.parents.add(parent);
+ return null;
+ }
+ }
+
+ private static void collectSemijoinOps(Operator<?> ts, NodeProcessorCtx ctx) throws SemanticException {
+ // create a walker which walks the tree in a DFS manner while maintaining
+ // the operator stack. The dispatcher
+ // generates the plan from the operator tree
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", SelectOperator.getOperatorName() + "%" +
+ TezDummyStoreOperator.getOperatorName() + "%"),
+ new SemijoinRemovalProc());
+ opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + "%" +
+ CommonMergeJoinOperator.getOperatorName() + "%"),
+ new SemijoinRemovalProc());
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, ctx);
+ GraphWalker ogw = new PreOrderOnceWalker(disp);
+ List<Node> startNodes = new ArrayList<Node>();
+ startNodes.add(ts);
+
+ HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
+ ogw.startWalking(startNodes, null);
+ }
+
+ private static class SMBJoinOpProc implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ List<TableScanOperator> tsOps = new ArrayList<TableScanOperator>();
+ // Get one top level TS Op directly from the stack
+ tsOps.add((TableScanOperator)stack.get(0));
+
+ // Get the other one by examining Join Op
+ List<Operator<?>> parents = ((CommonMergeJoinOperator) nd).getParentOperators();
+ for (Operator<?> parent : parents) {
+ if (parent instanceof TezDummyStoreOperator) {
+ // already accounted for
+ continue;
+ }
+
+ assert parent instanceof SelectOperator;
+ while(parent != null) {
+ if (parent instanceof TableScanOperator) {
+ tsOps.add((TableScanOperator) parent);
+ break;
+ }
+ parent = parent.getParentOperators().get(0);
+ }
+ }
+
+ // Now the relevant TableScanOperators are known, find if there exists
+ // a semijoin filter on any of them, if so, remove it.
+ ParseContext pctx = ((OptimizeTezProcContext) procCtx).parseContext;
+ for (TableScanOperator ts : tsOps) {
+ for (ReduceSinkOperator rs : pctx.getRsOpToTsOpMap().keySet()) {
+ if (ts == pctx.getRsOpToTsOpMap().get(rs)) {
+ // match!
+ GenTezUtils.removeBranch(rs);
+ GenTezUtils.removeSemiJoinOperator(pctx, rs, ts);
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+ private static void removeSemijoinOptimizationFromSMBJoins(
+ OptimizeTezProcContext procCtx) throws SemanticException {
+ if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) ||
+ procCtx.parseContext.getRsOpToTsOpMap().size() == 0) {
+ return;
+ }
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(
+ new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%" +
+ ".*" + TezDummyStoreOperator.getOperatorName() + "%" +
+ CommonMergeJoinOperator.getOperatorName() + "%"),
+ new SMBJoinOpProc());
+
+ // The dispatcher finds SMB and if there is semijoin optimization before it, removes it.
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
+ GraphWalker ogw = new PreOrderOnceWalker(disp);
+ ogw.startWalking(topNodes, null);
+ }
+
+ private static class SemiJoinCycleRemovalDueToMapsideJoins implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ ParseContext pCtx = ((OptimizeTezProcContext) procCtx).parseContext;
+ Operator<?> childJoin = ((Operator<?>) nd);
+ Operator<?> parentJoin = ((Operator<?>) stack.get(stack.size() - 2));
+
+ if (parentJoin.getChildOperators().size() == 1) {
+ // Nothing to do here
+ return null;
+ }
+
+ for (Operator<?> child : parentJoin.getChildOperators()) {
+ if (!(child instanceof SelectOperator)) {
+ continue;
+ }
+
+ while(child.getChildOperators().size() > 0) {
+ child = child.getChildOperators().get(0);
+ }
+
+ if (!(child instanceof ReduceSinkOperator)) {
+ continue;
+ }
+
+ ReduceSinkOperator rs = ((ReduceSinkOperator) child);
+ TableScanOperator ts = pCtx.getRsOpToTsOpMap().get(rs);
+ if (ts == null) {
+ continue;
+ }
+ // This is a semijoin branch. Find if this is creating a potential
+ // cycle with childJoin.
+ for (Operator<?> parent : childJoin.getParentOperators()) {
+ if (parent == parentJoin) {
+ continue;
+ }
+
+ assert parent instanceof ReduceSinkOperator;
+ while (parent.getParentOperators().size() > 0) {
+ parent = parent.getParentOperators().get(0);
+ }
+
+ if (parent == ts) {
+ // We have a cycle!
+ GenTezUtils.removeBranch(rs);
+ GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts);
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+ private static void removeSemiJoinCyclesDueToMapsideJoins(
+ OptimizeTezProcContext procCtx) throws SemanticException {
+ if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) ||
+ procCtx.parseContext.getRsOpToTsOpMap().size() == 0) {
+ return;
+ }
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(
+ new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%" +
+ MapJoinOperator.getOperatorName() + "%"),
+ new SemiJoinCycleRemovalDueToMapsideJoins());
+ opRules.put(
+ new RuleRegExp("R2", MapJoinOperator.getOperatorName() + "%" +
+ CommonMergeJoinOperator.getOperatorName() + "%"),
+ new SemiJoinCycleRemovalDueToMapsideJoins());
+ opRules.put(
+ new RuleRegExp("R3", CommonMergeJoinOperator.getOperatorName() + "%" +
+ MapJoinOperator.getOperatorName() + "%"),
+ new SemiJoinCycleRemovalDueToMapsideJoins());
+ opRules.put(
+ new RuleRegExp("R4", CommonMergeJoinOperator.getOperatorName() + "%" +
+ CommonMergeJoinOperator.getOperatorName() + "%"),
+ new SemiJoinCycleRemovalDueToMapsideJoins());
+
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
+ GraphWalker ogw = new PreOrderOnceWalker(disp);
+ ogw.startWalking(topNodes, null);
+ }
+
+ private static class SemiJoinRemovalIfNoStatsProc implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ assert nd instanceof ReduceSinkOperator;
+ ReduceSinkOperator rs = (ReduceSinkOperator) nd;
+ ParseContext pCtx = ((OptimizeTezProcContext) procCtx).parseContext;
+ TableScanOperator ts = pCtx.getRsOpToTsOpMap().get(rs);
+ if (ts == null) {
+ // nothing to do here.
+ return null;
+ }
+
+ // This is a semijoin branch. The stack should look like,
+ // <Parent Ops>-SEL-GB1-RS1-GB2-RS2
+ GroupByOperator gbOp = (GroupByOperator) (stack.get(stack.size() - 2));
+ GroupByDesc gbDesc = gbOp.getConf();
+ ArrayList<AggregationDesc> aggregationDescs = gbDesc.getAggregators();
+ boolean removeSemiJoin = false;
+ for (AggregationDesc agg : aggregationDescs) {
+ if (agg.getGenericUDAFName() != "bloom_filter") {
+ continue;
+ }
+
+ GenericUDAFBloomFilterEvaluator udafBloomFilterEvaluator =
+ (GenericUDAFBloomFilterEvaluator) agg.getGenericUDAFEvaluator();
+ long expectedEntries = udafBloomFilterEvaluator.getExpectedEntries();
+ if (expectedEntries == -1 || expectedEntries >
+ pCtx.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)) {
+ removeSemiJoin = true;
+ break;
+ }
+ }
+ if (removeSemiJoin) {
+ // The stats are not annotated, remove the semijoin operator
+ GenTezUtils.removeBranch(rs);
+ GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts);
+ }
+ return null;
+ }
+ }
+
+ private void removeSemiJoinIfNoStats(OptimizeTezProcContext procCtx)
+ throws SemanticException {
+ if(!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION)) {
+ // Not needed without semi-join reduction
+ return;
+ }
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(
+ new RuleRegExp("R1", GroupByOperator.getOperatorName() + "%" +
+ ReduceSinkOperator.getOperatorName() + "%" +
+ GroupByOperator.getOperatorName() + "%" +
+ ReduceSinkOperator.getOperatorName() + "%"),
+ new SemiJoinRemovalIfNoStatsProc());
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
+ GraphWalker ogw = new PreOrderOnceWalker(disp);
+ ogw.startWalking(topNodes, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
index 1ecbaad..f0b062e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
@@ -152,6 +152,13 @@ public class AggregationDesc implements java.io.Serializable {
}
sb.append(exp.getExprString());
}
+
+ String evaluatorExpr = getGenericUDAFEvaluator().getExprString();
+ if (evaluatorExpr != null && !evaluatorExpr.isEmpty()) {
+ sb.append(", ");
+ sb.append(evaluatorExpr);
+ }
+
sb.append(")");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 13a0811..8c341fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.LinkedHashSet;
import java.util.List;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -76,6 +78,10 @@ public abstract class BaseWork extends AbstractOperatorDesc {
private int reservedMemoryMB = -1; // default to -1 means we leave it up to Tez to decide
+ // Used for value registry
+ private Map<String, RuntimeValuesInfo> inputSourceToRuntimeValuesInfo =
+ new HashMap<String, RuntimeValuesInfo>();
+
public void setGatheringStats(boolean gatherStats) {
this.gatheringStats = gatherStats;
}
@@ -251,4 +257,13 @@ public abstract class BaseWork extends AbstractOperatorDesc {
public List<String> getSortCols() {
return sortColNames;
}
+
+ public Map<String, RuntimeValuesInfo> getInputSourceToRuntimeValuesInfo() {
+ return inputSourceToRuntimeValuesInfo;
+ }
+
+ public void setInputSourceToRuntimeValuesInfo(
+ String workName, RuntimeValuesInfo runtimeValuesInfo) {
+ inputSourceToRuntimeValuesInfo.put(workName, runtimeValuesInfo);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicValue.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicValue.java
new file mode 100644
index 0000000..874c62b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicValue.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.DynamicValueRegistry;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.sarg.LiteralDelegate;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.io.Serializable;
+
+
+public class DynamicValue implements LiteralDelegate, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String DYNAMIC_VALUE_REGISTRY_CACHE_KEY = "DynamicValueRegistry";
+
+ protected transient Configuration conf;
+
+ protected String id;
+ TypeInfo typeInfo;
+ PrimitiveObjectInspector objectInspector;
+
+ transient protected Object val;
+ transient boolean initialized = false;
+
+ public DynamicValue(String id, TypeInfo typeInfo) {
+ this.id = id;
+ this.typeInfo = typeInfo;
+ this.objectInspector = (PrimitiveObjectInspector) TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public TypeInfo getTypeInfo() {
+ return typeInfo;
+ }
+
+ public void setTypeInfo(TypeInfo typeInfo) {
+ this.typeInfo = typeInfo;
+ }
+
+ public PrimitiveObjectInspector getObjectInspector() {
+ return objectInspector;
+ }
+
+ public void setObjectInspector(PrimitiveObjectInspector objectInspector) {
+ this.objectInspector = objectInspector;
+ }
+
+ @Override
+ public String getId() { return id;}
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public Object getLiteral() {
+ return getJavaValue();
+ }
+
+ public Object getJavaValue() {
+ return objectInspector.getPrimitiveJavaObject(getValue());
+ }
+
+ public Object getWritableValue() {
+ return objectInspector.getPrimitiveWritableObject(getValue());
+ }
+
+ public Object getValue() {
+ if (initialized) {
+ return val;
+ }
+
+ if (conf == null) {
+ throw new IllegalStateException("Cannot retrieve dynamic value " + id + " - no conf set");
+ }
+
+ try {
+ // Get object cache
+ String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
+ ObjectCache cache = ObjectCacheFactory.getCache(conf, queryId, false);
+
+ // Get the registry
+ DynamicValueRegistry valueRegistry = cache.retrieve(DYNAMIC_VALUE_REGISTRY_CACHE_KEY);
+ if (valueRegistry == null) {
+ throw new IllegalStateException("DynamicValueRegistry not available");
+ }
+ val = valueRegistry.getValue(id);
+ initialized = true;
+ } catch (Exception err) {
+ throw new IllegalStateException("Failed to retrieve dynamic value for " + id, err);
+ }
+
+ return val;
+ }
+
+ @Override
+ public String toString() {
+ // If the id is a generated unique ID then this could affect .q file golden files for tests that run EXPLAIN queries.
+ return "DynamicValue(" + id + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicValueDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicValueDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicValueDesc.java
new file mode 100644
index 0000000..c9e7b67
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicValueDesc.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+
+
+/**
+ * This expression represents a value that will be available at runtime.
+ *
+ */
+public class ExprNodeDynamicValueDesc extends ExprNodeDesc implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected DynamicValue dynamicValue;
+
+ public ExprNodeDynamicValueDesc() {
+ }
+
+ public ExprNodeDynamicValueDesc(DynamicValue value) {
+ super(value.getTypeInfo());
+ this.dynamicValue = value;
+ }
+
+ @Override
+ public ExprNodeDesc clone() {
+ return new ExprNodeDynamicValueDesc(dynamicValue);
+ }
+
+ @Override
+ public boolean isSame(Object o) {
+ if (o instanceof ExprNodeDynamicValueDesc) {
+ Object otherValue = ((ExprNodeDynamicValueDesc) o).getDynamicValue();
+ if (dynamicValue == null) {
+ return otherValue == null;
+ }
+ return dynamicValue.equals(otherValue);
+ }
+ return false;
+ }
+
+ public DynamicValue getDynamicValue() {
+ return dynamicValue;
+ }
+
+ public void setValue(DynamicValue value) {
+ this.dynamicValue = value;
+ }
+
+ @Override
+ public String getExprString() {
+ return dynamicValue != null ? dynamicValue.toString() : "null dynamic literal";
+ }
+
+ @Override
+ public String toString() {
+ return dynamicValue != null ? dynamicValue.toString() : "null dynamic literal";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
new file mode 100644
index 0000000..fb9a140
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.BloomFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.sql.Timestamp;
+
+/**
+ * Generic UDF to generate Bloom Filter
+ */
+public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GenericUDAFBloomFilter.class);
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
+ return new GenericUDAFBloomFilterEvaluator();
+ }
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
+ return new GenericUDAFBloomFilterEvaluator();
+ }
+
+ /**
+ * GenericUDAFBloomFilterEvaluator - Evaluator class for BloomFilter
+ */
+ public static class GenericUDAFBloomFilterEvaluator extends GenericUDAFEvaluator {
+ // Source operator to get the number of entries
+ private Operator<?> sourceOperator;
+ private long maxEntries = 0;
+
+ // ObjectInspector for input data.
+ private PrimitiveObjectInspector inputOI;
+
+ // Bloom filter rest
+ private ByteArrayOutputStream result = new ByteArrayOutputStream();
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+ super.init(m, parameters);
+
+ // Initialize input
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
+ inputOI = (PrimitiveObjectInspector) parameters[0];
+ } else {
+ // Do nothing for other modes
+ }
+
+ // Output will be same in both partial or full aggregation modes.
+ // It will be a BloomFilter in ByteWritable
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ }
+
+ /**
+ * Class for storing the BloomFilter
+ */
+ @AggregationType(estimable = true)
+ static class BloomFilterBuf extends AbstractAggregationBuffer {
+ BloomFilter bloomFilter;
+
+ public BloomFilterBuf(long expectedEntries, long maxEntries) {
+ if (expectedEntries > maxEntries) {
+ bloomFilter = new BloomFilter(1);
+ } else {
+ bloomFilter = new BloomFilter(expectedEntries);
+ }
+ }
+
+ @Override
+ public int estimate() {
+ return (int) bloomFilter.sizeInBytes();
+ }
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ ((BloomFilterBuf)agg).bloomFilter.reset();
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ long expectedEntries = getExpectedEntries();
+ if (expectedEntries < 0) {
+ throw new IllegalStateException("BloomFilter expectedEntries not initialized");
+ }
+
+ BloomFilterBuf buf = new BloomFilterBuf(expectedEntries, maxEntries);
+ reset(buf);
+ return buf;
+ }
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+ if (parameters == null || parameters[0] == null) {
+ // 2nd condition occurs when the input has 0 rows (possible due to
+ // filtering, joins etc).
+ return;
+ }
+
+ BloomFilter bf = ((BloomFilterBuf)agg).bloomFilter;
+
+ // Add the expression into the BloomFilter
+ switch (inputOI.getPrimitiveCategory()) {
+ case BOOLEAN:
+ boolean vBoolean = ((BooleanObjectInspector)inputOI).get(parameters[0]);
+ bf.addLong(vBoolean ? 1 : 0);
+ break;
+ case BYTE:
+ byte vByte = ((ByteObjectInspector)inputOI).get(parameters[0]);
+ bf.addLong(vByte);
+ break;
+ case SHORT:
+ short vShort = ((ShortObjectInspector)inputOI).get(parameters[0]);
+ bf.addLong(vShort);
+ break;
+ case INT:
+ int vInt = ((IntObjectInspector)inputOI).get(parameters[0]);
+ bf.addLong(vInt);
+ break;
+ case LONG:
+ long vLong = ((LongObjectInspector)inputOI).get(parameters[0]);
+ bf.addLong(vLong);
+ break;
+ case FLOAT:
+ float vFloat = ((FloatObjectInspector)inputOI).get(parameters[0]);
+ bf.addDouble(vFloat);
+ break;
+ case DOUBLE:
+ double vDouble = ((DoubleObjectInspector)inputOI).get(parameters[0]);
+ bf.addDouble(vDouble);
+ break;
+ case DECIMAL:
+ HiveDecimal vDecimal = ((HiveDecimalObjectInspector)inputOI).
+ getPrimitiveJavaObject(parameters[0]);
+ bf.addString(vDecimal.toString());
+ break;
+ case DATE:
+ DateWritable vDate = ((DateObjectInspector)inputOI).
+ getPrimitiveWritableObject(parameters[0]);
+ bf.addLong(vDate.getDays());
+ break;
+ case TIMESTAMP:
+ Timestamp vTimeStamp = ((TimestampObjectInspector)inputOI).
+ getPrimitiveJavaObject(parameters[0]);
+ bf.addLong(vTimeStamp.getTime());
+ break;
+ case CHAR:
+ Text vChar = ((HiveCharObjectInspector)inputOI).
+ getPrimitiveWritableObject(parameters[0]).getStrippedValue();
+ bf.addBytes(vChar.getBytes(), 0, vChar.getLength());
+ break;
+ case VARCHAR:
+ Text vVarChar = ((HiveVarcharObjectInspector)inputOI).
+ getPrimitiveWritableObject(parameters[0]).getTextValue();
+ bf.addBytes(vVarChar.getBytes(), 0, vVarChar.getLength());
+ break;
+ case STRING:
+ Text vString = ((StringObjectInspector)inputOI).
+ getPrimitiveWritableObject(parameters[0]);
+ bf.addBytes(vString.getBytes(), 0, vString.getLength());
+ break;
+ case BINARY:
+ BytesWritable vBytes = ((BinaryObjectInspector)inputOI).
+ getPrimitiveWritableObject(parameters[0]);
+ bf.addBytes(vBytes.getBytes(), 0, vBytes.getLength());
+ break;
+ default:
+ throw new UDFArgumentTypeException(0,
+ "Bad primitive category " + inputOI.getPrimitiveCategory());
+ }
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ if (partial == null) {
+ return;
+ }
+
+ BytesWritable bytes = (BytesWritable) partial;
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes.getBytes());
+ // Deserialze the bloomfilter
+ try {
+ BloomFilter bf = BloomFilter.deserialize(in);
+ ((BloomFilterBuf)agg).bloomFilter.merge(bf);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ result.reset();
+ try {
+ BloomFilter.serialize(result, ((BloomFilterBuf)agg).bloomFilter);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ return new BytesWritable(result.toByteArray());
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ return terminate(agg);
+ }
+
+ public long getExpectedEntries() {
+ if (sourceOperator != null && sourceOperator.getStatistics() != null) {
+ return sourceOperator.getStatistics().getNumRows();
+ }
+ return -1;
+ }
+
+ public Operator<?> getSourceOperator() {
+ return sourceOperator;
+ }
+
+ public void setSourceOperator(Operator<?> sourceOperator) {
+ this.sourceOperator = sourceOperator;
+ }
+
+ public void setMaxEntries(long maxEntries) {
+ this.maxEntries = maxEntries;
+ }
+
+ @Override
+ public String getExprString() {
+ return "expectedEntries=" + getExpectedEntries();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
index 18d5285..3a98276 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
@@ -262,6 +262,13 @@ public abstract class GenericUDAFEvaluator implements Closeable {
return null;
}
+ /**
+ * Optional information to add to expression string. Subclasses can override.
+ */
+ public String getExprString() {
+ return "";
+ }
+
protected BasePartitionEvaluator partitionEvaluator;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
new file mode 100644
index 0000000..1b7de6c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.BloomFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.sql.Timestamp;
+
+/**
+ * GenericUDF to lookup a value in BloomFilter
+ */
+public class GenericUDFInBloomFilter extends GenericUDF {
+ private static final Logger LOG = LoggerFactory.getLogger(GenericUDFInBloomFilter.class);
+
+ private transient ObjectInspector valObjectInspector;
+ private transient ObjectInspector bloomFilterObjectInspector;
+ private transient BloomFilter bloomFilter;
+ private transient boolean initializedBloomFilter;
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+ if (arguments.length != 2) {
+ throw new UDFArgumentLengthException(
+ "InBloomFilter requires exactly 2 arguments but got " + arguments.length);
+ }
+
+ // Verify individual arguments
+ if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(0, "The 1st argument must be a primitive type but "
+ + arguments[0].getTypeName() + " was passed");
+ }
+
+ if (((PrimitiveObjectInspector) arguments[1]).getPrimitiveCategory() !=
+ PrimitiveObjectInspector.PrimitiveCategory.BINARY) {
+ throw new UDFArgumentTypeException(1, "The 2nd argument must be a binary type but " +
+ arguments[1].getTypeName() + " was passed");
+ }
+
+ valObjectInspector = arguments[0];
+ bloomFilterObjectInspector = arguments[1];
+ assert bloomFilterObjectInspector instanceof WritableBinaryObjectInspector;
+
+ initializedBloomFilter = false;
+ return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ return getStandardDisplayString("in_bloom_filter", children);
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+ // Return if either of the arguments is null
+ if (arguments[0].get() == null || arguments[1].get() == null) {
+ return null;
+ }
+
+ if (!initializedBloomFilter) {
+ // Setup the bloom filter once
+ try {
+ BytesWritable bw = (BytesWritable) arguments[1].get();
+ byte[] bytes = new byte[bw.getLength()];
+ System.arraycopy(bw.getBytes(), 0, bytes, 0, bw.getLength());
+ bloomFilter = BloomFilter.deserialize(new ByteArrayInputStream(bytes));
+ } catch ( IOException e) {
+ throw new HiveException(e);
+ }
+ initializedBloomFilter = true;
+ }
+
+ // Check if the value is in bloom filter
+ switch (((PrimitiveObjectInspector)valObjectInspector).
+ getTypeInfo().getPrimitiveCategory()) {
+ case BOOLEAN:
+ boolean vBoolean = ((BooleanObjectInspector)valObjectInspector).
+ get(arguments[0].get());
+ return bloomFilter.testLong(vBoolean ? 1 : 0);
+ case BYTE:
+ byte vByte = ((ByteObjectInspector) valObjectInspector).
+ get(arguments[0].get());
+ return bloomFilter.testLong(vByte);
+ case SHORT:
+ short vShort = ((ShortObjectInspector) valObjectInspector).
+ get(arguments[0].get());
+ return bloomFilter.testLong(vShort);
+ case INT:
+ int vInt = ((IntObjectInspector) valObjectInspector).
+ get(arguments[0].get());
+ return bloomFilter.testLong(vInt);
+ case LONG:
+ long vLong = ((LongObjectInspector) valObjectInspector).
+ get(arguments[0].get());
+ return bloomFilter.testLong(vLong);
+ case FLOAT:
+ float vFloat = ((FloatObjectInspector) valObjectInspector).
+ get(arguments[0].get());
+ return bloomFilter.testDouble(vFloat);
+ case DOUBLE:
+ double vDouble = ((DoubleObjectInspector) valObjectInspector).
+ get(arguments[0].get());
+ return bloomFilter.testDouble(vDouble);
+ case DECIMAL:
+ HiveDecimal vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
+ getPrimitiveJavaObject(arguments[0].get());
+ return bloomFilter.testString(vDecimal.toString());
+ case DATE:
+ DateWritable vDate = ((DateObjectInspector) valObjectInspector).
+ getPrimitiveWritableObject(arguments[0].get());
+ return bloomFilter.testLong(vDate.getDays());
+ case TIMESTAMP:
+ Timestamp vTimeStamp = ((TimestampObjectInspector) valObjectInspector).
+ getPrimitiveJavaObject(arguments[0].get());
+ return bloomFilter.testLong(vTimeStamp.getTime());
+ case CHAR:
+ Text vChar = ((HiveCharObjectInspector) valObjectInspector).
+ getPrimitiveWritableObject(arguments[0].get()).getStrippedValue();
+ return bloomFilter.testBytes(vChar.getBytes(), 0, vChar.getLength());
+ case VARCHAR:
+ Text vVarchar = ((HiveVarcharObjectInspector) valObjectInspector).
+ getPrimitiveWritableObject(arguments[0].get()).getTextValue();
+ return bloomFilter.testBytes(vVarchar.getBytes(), 0, vVarchar.getLength());
+ case STRING:
+ Text vString = ((StringObjectInspector) valObjectInspector).
+ getPrimitiveWritableObject(arguments[0].get());
+ return bloomFilter.testBytes(vString.getBytes(), 0, vString.getLength());
+ case BINARY:
+ BytesWritable vBytes = ((BinaryObjectInspector) valObjectInspector).
+ getPrimitiveWritableObject(arguments[0].get());
+ return bloomFilter.testBytes(vBytes.getBytes(), 0, vBytes.getLength());
+ default:
+ throw new UDFArgumentTypeException(0, "Bad primitive category " +
+ ((PrimitiveTypeInfo) valObjectInspector).getPrimitiveCategory());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
index 93b50a6..6563290 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
@@ -28,6 +28,7 @@ import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -44,6 +45,8 @@ import com.google.common.collect.Sets;
*/
public class TestConvertAstToSearchArg {
+ private final Configuration conf = new Configuration();
+
private static void assertNoSharedNodes(ExpressionTree tree,
Set<ExpressionTree> seen
) throws Exception {
@@ -547,7 +550,7 @@ public class TestConvertAstToSearchArg {
"</java> \n";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(9, leaves.size());
@@ -836,7 +839,7 @@ public class TestConvertAstToSearchArg {
"</java> \n";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(4, leaves.size());
@@ -1269,7 +1272,7 @@ public class TestConvertAstToSearchArg {
"</java> \n";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(3, leaves.size());
@@ -1493,7 +1496,7 @@ public class TestConvertAstToSearchArg {
"\n";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(3, leaves.size());
@@ -1763,7 +1766,7 @@ public class TestConvertAstToSearchArg {
"</java> \n";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(1, leaves.size());
@@ -2246,7 +2249,7 @@ public class TestConvertAstToSearchArg {
"</java>";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(9, leaves.size());
@@ -2405,7 +2408,7 @@ public class TestConvertAstToSearchArg {
"</java> ";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(0, leaves.size());
@@ -2538,7 +2541,7 @@ public class TestConvertAstToSearchArg {
"</java> ";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(0, leaves.size());
@@ -2663,7 +2666,7 @@ public class TestConvertAstToSearchArg {
"</java>";
SearchArgumentImpl sarg =
- (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+ (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
List<PredicateLeaf> leaves = sarg.getLeaves();
assertEquals(1, leaves.size());
@@ -2712,7 +2715,7 @@ public class TestConvertAstToSearchArg {
"AAABgj0BRVFVQcwBBW9yZy5hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5Q" +
"EAAAECAQFib29sZWHu";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2731,7 +2734,7 @@ public class TestConvertAstToSearchArg {
"Y2hlLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUH" +
"MAQVvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g==";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2751,7 +2754,7 @@ public class TestConvertAstToSearchArg {
"oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQZvcmcuYXBhY2" +
"hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g==";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2771,7 +2774,7 @@ public class TestConvertAstToSearchArg {
"vb3AuaGl2ZS5xbC51ZGYuZ2VuZXJpYy5HZW5lcmljVURGT1BFcXVh7AEAAAGCPQFFUVVBzAEGb3JnLm" +
"FwYWNoZS5oYWRvb3AuaW8uQm9vbGVhbldyaXRhYmzlAQAAAQQBAWJvb2xlYe4=";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2791,7 +2794,7 @@ public class TestConvertAstToSearchArg {
"lLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQ" +
"ZvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g==";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2810,7 +2813,7 @@ public class TestConvertAstToSearchArg {
"dmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5hcGFjaGU" +
"uaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2831,7 +2834,7 @@ public class TestConvertAstToSearchArg {
"hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAwkBAgEBYrIAAAgBAwkBB29yZy5hcGFjaGUua" +
"GFkb29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QQW7kAQEGAQAAAQMJ";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("(and leaf-0 leaf-1)", sarg.getExpression().toString());
assertEquals(2, sarg.getLeaves().size());
@@ -2853,7 +2856,7 @@ public class TestConvertAstToSearchArg {
"aXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQVvcmcuYXBhY2h" +
"lLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g==";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
@@ -2872,7 +2875,7 @@ public class TestConvertAstToSearchArg {
"b29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5" +
"hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu";
SearchArgument sarg =
- new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+ new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
.buildSearchArgument();
assertEquals("leaf-0", sarg.getExpression().toString());
assertEquals(1, sarg.getLeaves().size());
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
index 8cbc26d..df42058 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
@@ -79,7 +79,7 @@ public class TestSearchArgumentImpl {
Object literal,
List<Object> literalList) {
return new SearchArgumentImpl.PredicateLeafImpl(operator, type, columnName,
- literal, literalList);
+ literal, literalList, null);
}
@Test