You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/24 20:02:52 UTC

[5/6] hive git commit: HIVE-15269: Dynamic Min-Max/BloomFilter runtime-filtering for Tez (Deepak Jaiswal via Jason Dere)

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java
index d9ce017..b8a60f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RedundantDynamicPruningConditionsRemoval.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Stack;
 
 import org.apache.calcite.util.Pair;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -75,16 +76,19 @@ public class RedundantDynamicPruningConditionsRemoval extends Transform {
    */
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
-    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%" +
-        FilterOperator.getOperatorName() + "%"), new FilterTransformer());
-
-    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
-    GraphWalker ogw = new DefaultGraphWalker(disp);
-
-    List<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pctx.getTopOps().values());
-    ogw.startWalking(topNodes, null);
+    // Make sure semijoin is not enabled. If it is, then do not remove the dynamic partition pruning predicates.
+    if (!pctx.getConf().getBoolVar(HiveConf.ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION)) {
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%" +
+              FilterOperator.getOperatorName() + "%"), new FilterTransformer());
+
+      Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+
+      List<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(pctx.getTopOps().values());
+      ogw.startWalking(topNodes, null);
+    }
     return pctx;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
index aa1e509..61f1374 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
@@ -51,41 +51,10 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.AggregationDesc;
-import org.apache.hadoop.hive.ql.plan.ColStatistics;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnListDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
-import org.apache.hadoop.hive.ql.plan.JoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualNS;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
+import org.apache.hadoop.hive.ql.udf.generic.*;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
@@ -494,6 +463,12 @@ public class StatsRulesProcFactory {
       final ExprNodeDesc leftExpression = fd.getChildren().get(2); // left expression
       final ExprNodeDesc rightExpression = fd.getChildren().get(3); // right expression
 
+      // Short circuit and return the current number of rows if this is a
+      // synthetic predicate with dynamic values
+      if (leftExpression instanceof ExprNodeDynamicValueDesc) {
+        return stats.getNumRows();
+      }
+
       // We transform the BETWEEN clause to AND clause (with NOT on top in invert is true).
       // This is more straightforward, as the evaluateExpression method will deal with
       // generating the final row count relying on the basic comparator evaluation methods
@@ -888,18 +863,29 @@ public class StatsRulesProcFactory {
         } else if (udf instanceof GenericUDFOPNotEqual) {
           return numRows;
         } else if (udf instanceof GenericUDFOPEqualOrGreaterThan
-            || udf instanceof GenericUDFOPEqualOrLessThan
-            || udf instanceof GenericUDFOPGreaterThan
-            || udf instanceof GenericUDFOPLessThan) {
+                || udf instanceof GenericUDFOPEqualOrLessThan
+                || udf instanceof GenericUDFOPGreaterThan
+                || udf instanceof GenericUDFOPLessThan) {
           return evaluateComparator(stats, genFunc);
         } else if (udf instanceof GenericUDFOPNotNull) {
-            return evaluateNotNullExpr(stats, genFunc);
+          return evaluateNotNullExpr(stats, genFunc);
         } else if (udf instanceof GenericUDFOPNull) {
           return evaluateColEqualsNullExpr(stats, genFunc);
         } else if (udf instanceof GenericUDFOPAnd || udf instanceof GenericUDFOPOr
-            || udf instanceof GenericUDFIn || udf instanceof GenericUDFBetween
-            || udf instanceof GenericUDFOPNot) {
+                || udf instanceof GenericUDFIn || udf instanceof GenericUDFBetween
+                || udf instanceof GenericUDFOPNot) {
           return evaluateExpression(stats, genFunc, aspCtx, neededCols, fop, evaluatedRowCount);
+        } else if (udf instanceof GenericUDFInBloomFilter) {
+          if (genFunc.getChildren().get(1) instanceof ExprNodeDynamicValueDesc) {
+            // Synthetic predicates from semijoin opt should not affect stats.
+            return numRows;
+          }
+        }
+      } else if (child instanceof ExprNodeConstantDesc) {
+        if (Boolean.FALSE.equals(((ExprNodeConstantDesc) child).getValue())) {
+          return 0;
+        } else {
+          return stats.getNumRows();
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 3a111aa..6141391 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -20,13 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
 
 import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
 
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,18 +36,13 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.*;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -269,6 +258,15 @@ public class GenTezUtils {
               ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot);
             }
           }
+          // This TableScanOperator could be part of semijoin optimization.
+          Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap =
+                  context.parseContext.getRsOpToTsOpMap();
+          for (ReduceSinkOperator rs : rsOpToTsOpMap.keySet()) {
+            if (rsOpToTsOpMap.get(rs) == orig) {
+              rsOpToTsOpMap.put(rs, (TableScanOperator) newRoot);
+              break;
+            }
+          }
         }
         context.rootToWorkMap.remove(orig);
         context.rootToWorkMap.put(newRoot, work);
@@ -479,7 +477,7 @@ public class GenTezUtils {
    * Remove an operator branch. When we see a fork, we know it's time to do the removal.
    * @param event the leaf node of which branch to be removed
    */
-  public static void removeBranch(AppMasterEventOperator event) {
+  public static void removeBranch(Operator<?> event) {
     Operator<?> child = event;
     Operator<?> curr = event;
 
@@ -511,4 +509,140 @@ public class GenTezUtils {
     }
     return EdgeType.SIMPLE_EDGE;
   }
+
+  public static void processDynamicMinMaxPushDownOperator(
+          GenTezProcContext procCtx, RuntimeValuesInfo runtimeValuesInfo,
+          ReduceSinkOperator rs)
+          throws SemanticException {
+    TableScanOperator ts = procCtx.parseContext.getRsOpToTsOpMap().get(rs);
+
+    List<BaseWork> rsWorkList = procCtx.childToWorkMap.get(rs);
+    if (ts == null || rsWorkList == null) {
+      // This happens when the ReduceSink's edge has been removed by cycle
+      // detection logic. Nothing to do here.
+      return;
+    }
+    LOG.debug("ResduceSink " + rs + " to TableScan " + ts);
+
+    if (rsWorkList.size() != 1) {
+      StringBuilder sb = new StringBuilder();
+      for (BaseWork curWork : rsWorkList) {
+        if ( sb.length() > 0) {
+          sb.append(", ");
+        }
+        sb.append(curWork.getName());
+      }
+      throw new SemanticException(rs + " belongs to multiple BaseWorks: " + sb.toString());
+    }
+
+    BaseWork parentWork = rsWorkList.get(0);
+    BaseWork childWork = procCtx.rootToWorkMap.get(ts);
+
+    // Connect parent/child work with a brodacast edge.
+    LOG.debug("Connecting Baswork - " + parentWork.getName() + " to " + childWork.getName());
+    TezEdgeProperty edgeProperty = new TezEdgeProperty(EdgeType.BROADCAST_EDGE);
+    TezWork tezWork = procCtx.currentTask.getWork();
+    tezWork.connect(parentWork, childWork, edgeProperty);
+
+    // Set output names in ReduceSink
+    rs.getConf().setOutputName(childWork.getName());
+
+    // Set up the dynamic values in the childWork.
+    RuntimeValuesInfo childRuntimeValuesInfo =
+            new RuntimeValuesInfo();
+    childRuntimeValuesInfo.setTableDesc(runtimeValuesInfo.getTableDesc());
+    childRuntimeValuesInfo.setDynamicValueIDs(runtimeValuesInfo.getDynamicValueIDs());
+    childRuntimeValuesInfo.setColExprs(runtimeValuesInfo.getColExprs());
+    childWork.setInputSourceToRuntimeValuesInfo(
+            parentWork.getName(), childRuntimeValuesInfo);
+  }
+
+  // Functionality to remove semi-join optimization
+  public static void removeSemiJoinOperator(ParseContext context,
+                                     ReduceSinkOperator rs,
+                                     TableScanOperator ts) throws SemanticException{
+    // Cleanup the synthetic predicate in the tablescan operator by
+    // replacing it with "true"
+    LOG.debug("Removing ReduceSink " + rs + " and TableScan " + ts);
+    ExprNodeDesc constNode = new ExprNodeConstantDesc(
+            TypeInfoFactory.booleanTypeInfo, Boolean.TRUE);
+    DynamicValuePredicateContext filterDynamicValuePredicatesCollection =
+            new DynamicValuePredicateContext();
+    collectDynamicValuePredicates(ts.getConf().getFilterExpr(),
+            filterDynamicValuePredicatesCollection);
+    for (ExprNodeDesc nodeToRemove : filterDynamicValuePredicatesCollection
+            .childParentMapping.keySet()) {
+      // Find out if this synthetic predicate belongs to the current cycle
+      boolean skip = true;
+      for (ExprNodeDesc expr : nodeToRemove.getChildren()) {
+        if (expr instanceof ExprNodeDynamicValueDesc ) {
+          String dynamicValueIdFromExpr = ((ExprNodeDynamicValueDesc) expr)
+                  .getDynamicValue().getId();
+          List<String> dynamicValueIdsFromMap = context.
+                  getRsToRuntimeValuesInfoMap().get(rs).getDynamicValueIDs();
+          for (String dynamicValueIdFromMap : dynamicValueIdsFromMap) {
+            if (dynamicValueIdFromExpr.equals(dynamicValueIdFromMap)) {
+              // Intended predicate to be removed
+              skip = false;
+              break;
+            }
+          }
+        }
+      }
+      if (!skip) {
+        ExprNodeDesc nodeParent = filterDynamicValuePredicatesCollection
+                .childParentMapping.get(nodeToRemove);
+        if (nodeParent == null) {
+          // This was the only predicate, set filter expression to null
+          ts.getConf().setFilterExpr(null);
+        } else {
+          int i = nodeParent.getChildren().indexOf(nodeToRemove);
+          nodeParent.getChildren().remove(i);
+          nodeParent.getChildren().add(i, constNode);
+        }
+        // skip the rest of the predicates
+        skip = true;
+      }
+    }
+    context.getRsOpToTsOpMap().remove(rs);
+  }
+
+  private static class DynamicValuePredicateContext implements NodeProcessorCtx {
+    HashMap<ExprNodeDesc, ExprNodeDesc> childParentMapping = new HashMap<ExprNodeDesc, ExprNodeDesc>();
+  }
+
+  private static class DynamicValuePredicateProc implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                          Object... nodeOutputs) throws SemanticException {
+      DynamicValuePredicateContext ctx = (DynamicValuePredicateContext) procCtx;
+      ExprNodeDesc parent = (ExprNodeDesc) stack.get(stack.size() - 2);
+      if (parent instanceof ExprNodeGenericFuncDesc) {
+        ExprNodeGenericFuncDesc parentFunc = (ExprNodeGenericFuncDesc) parent;
+        if (parentFunc.getGenericUDF() instanceof GenericUDFBetween ||
+                parentFunc.getGenericUDF() instanceof GenericUDFInBloomFilter) {
+          ExprNodeDesc grandParent = stack.size() >= 3 ?
+                  (ExprNodeDesc) stack.get(stack.size() - 3) : null;
+          ctx.childParentMapping.put(parentFunc, grandParent);
+        }
+      }
+
+      return null;
+    }
+  }
+
+  private static void collectDynamicValuePredicates(ExprNodeDesc pred, NodeProcessorCtx ctx) throws SemanticException {
+    // create a walker which walks the tree in a DFS manner while maintaining
+    // the operator stack. The dispatcher
+    // generates the plan from the operator tree
+    Map<Rule, NodeProcessor> exprRules = new LinkedHashMap<Rule, NodeProcessor>();
+    exprRules.put(new RuleRegExp("R1", ExprNodeDynamicValueDesc.class.getName() + "%"), new DynamicValuePredicateProc());
+    Dispatcher disp = new DefaultRuleDispatcher(null, exprRules, ctx);
+    GraphWalker egw = new DefaultGraphWalker(disp);
+    List<Node> startNodes = new ArrayList<Node>();
+    startNodes.add(pred);
+
+    egw.startWalking(startNodes, null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 35f34da..3f9f76c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateViewDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -125,6 +126,11 @@ public class ParseContext {
   private boolean needViewColumnAuthorization;
   private Set<FileSinkDesc> acidFileSinks = Collections.emptySet();
 
+  // Map to store mapping between reduce sink Operator and TS Operator for semijoin
+  private Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap =
+          new HashMap<ReduceSinkOperator, TableScanOperator>();
+  private Map<ReduceSinkOperator, RuntimeValuesInfo> rsToRuntimeValuesInfo =
+          new HashMap<ReduceSinkOperator, RuntimeValuesInfo>();
 
   public ParseContext() {
   }
@@ -652,4 +658,19 @@ public class ParseContext {
     }
   }
 
+  public void setRsToRuntimeValuesInfoMap(Map<ReduceSinkOperator, RuntimeValuesInfo> rsToRuntimeValuesInfo) {
+    this.rsToRuntimeValuesInfo = rsToRuntimeValuesInfo;
+  }
+
+  public Map<ReduceSinkOperator, RuntimeValuesInfo> getRsToRuntimeValuesInfoMap() {
+    return rsToRuntimeValuesInfo;
+  }
+
+  public void setRsOpToTsOpMap(Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap) {
+    this.rsOpToTsOpMap = rsOpToTsOpMap;
+  }
+
+  public Map<ReduceSinkOperator, TableScanOperator> getRsOpToTsOpMap() {
+    return rsOpToTsOpMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/RuntimeValuesInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RuntimeValuesInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RuntimeValuesInfo.java
new file mode 100644
index 0000000..5865f1a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RuntimeValuesInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Holds structures required for runtime values and mappings.
+ */
+public class RuntimeValuesInfo implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private TableDesc tableDesc;
+  private List<String> dynamicValueIDs;
+  private List<ExprNodeDesc> colExprs;
+
+  // get-set methods
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+
+  public void setTableDesc(TableDesc tableDesc) {
+    this.tableDesc = tableDesc;
+  }
+
+  public List<String> getDynamicValueIDs() {
+    return dynamicValueIDs;
+  }
+
+  public void setDynamicValueIDs(List<String> dynamicValueIDs) {
+    this.dynamicValueIDs = dynamicValueIDs;
+  }
+
+  public List<ExprNodeDesc> getColExprs() {
+    return colExprs;
+  }
+
+  public void setColExprs(List<ExprNodeDesc> colExprs) {
+    this.colExprs = colExprs;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index e8b003e..5f9ccc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -530,6 +530,9 @@ public abstract class TaskCompiler {
     clone.setFetchTask(pCtx.getFetchTask());
     clone.setLineageInfo(pCtx.getLineageInfo());
     clone.setMapJoinOps(pCtx.getMapJoinOps());
+    clone.setRsToRuntimeValuesInfoMap(pCtx.getRsToRuntimeValuesInfoMap());
+    clone.setRsOpToTsOpMap(pCtx.getRsOpToTsOpMap());
+
     return clone;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index cdb9e1b..cf8e843 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -18,49 +18,22 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.lib.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
-import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
-import org.apache.hadoop.hive.ql.lib.Dispatcher;
-import org.apache.hadoop.hive.ql.lib.ForwardWalker;
-import org.apache.hadoop.hive.ql.lib.GraphWalker;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.Rule;
-import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
@@ -83,12 +56,6 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
 import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 
@@ -97,7 +64,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
  */
 public class TezCompiler extends TaskCompiler {
 
-  protected final Logger LOG = LoggerFactory.getLogger(TezCompiler.class);
+  protected static final Logger LOG = LoggerFactory.getLogger(TezCompiler.class);
 
   public TezCompiler() {
   }
@@ -140,6 +107,29 @@ public class TezCompiler extends TaskCompiler {
     runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs);
     perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run cycle analysis for partition pruning");
 
+    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+    // Remove semijoin optimization if it creates a cycle with mapside joins
+    removeSemiJoinCyclesDueToMapsideJoins(procCtx);
+    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if it creates a cycle with mapside join");
+
+    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+    // Remove semijoin optimization if SMB join is created.
+    removeSemijoinOptimizationFromSMBJoins(procCtx);
+    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if needed");
+
+    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+    // Remove bloomfilter if no stats generated
+    removeSemiJoinIfNoStats(procCtx);
+    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove bloom filter optimizations if needed");
+
+    // need a new run of the constant folding because we might have created lots
+    // of "and true and true" conditions.
+    // Rather than run the full constant folding just need to shortcut AND/OR expressions
+    // involving constant true/false values.
+    if(procCtx.conf.getBoolVar(ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
+      new ConstantPropagate(ConstantPropagateOption.SHORTCUT).transform(procCtx.parseContext);
+    }
+
   }
 
   private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx,
@@ -163,7 +153,7 @@ public class TezCompiler extends TaskCompiler {
         if (component.size() != 1) {
           LOG.info("Found cycle in operator plan...");
           cycleFree = false;
-          removeEventOperator(component, procCtx);
+          removeCycleOperator(component, procCtx);
           break;
         }
       }
@@ -171,29 +161,72 @@ public class TezCompiler extends TaskCompiler {
     }
   }
 
-  private void removeEventOperator(Set<Operator<?>> component, OptimizeTezProcContext context) {
-    AppMasterEventOperator victim = null;
+  private void removeCycleOperator(Set<Operator<?>> component, OptimizeTezProcContext context) throws SemanticException {
+    AppMasterEventOperator victimAM = null;
+    TableScanOperator victimTS = null;
+    ReduceSinkOperator victimRS = null;
+
     for (Operator<?> o : component) {
+      // Look for AppMasterEventOperator or ReduceSinkOperator
       if (o instanceof AppMasterEventOperator) {
-        if (victim == null
-            || o.getConf().getStatistics().getDataSize() < victim.getConf().getStatistics()
+        if (victimAM == null
+                || o.getStatistics().getDataSize() < victimAM.getStatistics()
                 .getDataSize()) {
-          victim = (AppMasterEventOperator) o;
+          victimAM = (AppMasterEventOperator) o;
         }
+      } else if (o instanceof ReduceSinkOperator) {
+        TableScanOperator ts = context.parseContext.getRsOpToTsOpMap().get(o);
+        if (ts == null) {
+          continue;
+        }
+        // Sanity check
+        assert component.contains(ts);
+
+        if (victimRS == null ||
+                ts.getStatistics().getDataSize() <
+                victimTS.getStatistics().getDataSize()) {
+            victimRS = (ReduceSinkOperator) o;
+            victimTS = ts;
+          }
+        }
+      }
+
+    // Always set the min/max optimization as victim.
+    Operator<?> victim = victimRS;
+
+    if (victimRS == null && victimAM != null ) {
+        victim = victimAM;
+    } else if (victimAM == null) {
+      // do nothing
+    } else {
+      // Cycle consists of atleast one dynamic partition pruning(DPP)
+      // optimization and atleast one min/max optimization.
+      // DPP is a better optimization unless it ends up scanning the
+      // bigger table for keys instead of the smaller table.
+
+      // Get the parent TS of victimRS.
+      Operator<?> op = victimRS;
+      while(!(op instanceof TableScanOperator)) {
+        op = op.getParentOperators().get(0);
+      }
+      if ((2 * op.getStatistics().getDataSize()) <
+              victimAM.getStatistics().getDataSize()) {
+        victim = victimAM;
       }
     }
 
     if (victim == null ||
-        (!context.pruningOpsRemovedByPriorOpt.isEmpty() &&
-         context.pruningOpsRemovedByPriorOpt.contains(victim))) {
+            (!context.pruningOpsRemovedByPriorOpt.isEmpty() &&
+                    context.pruningOpsRemovedByPriorOpt.contains(victim))) {
       return;
     }
 
     GenTezUtils.removeBranch(victim);
-    // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
-    LOG.info("Disabling dynamic pruning for: "
-        + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
-        + ". Needed to break cyclic dependency");
+
+    if (victim == victimRS) {
+      GenTezUtils.removeSemiJoinOperator(context.parseContext, victimRS, victimTS);
+    }
+    return;
   }
 
   // Tarjan's algo
@@ -205,11 +238,11 @@ public class TezCompiler extends TaskCompiler {
     Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>();
     Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>();
     Stack<Operator<?>> nodes = new Stack<Operator<?>>();
-    Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>();
+    Set<Set<Operator<?>>> components = new LinkedHashSet<Set<Operator<?>>>();
 
     for (Operator<?> o : deque) {
       if (!indexes.containsKey(o)) {
-        connect(o, index, nodes, indexes, lowLinks, components);
+        connect(o, index, nodes, indexes, lowLinks, components, procCtx.parseContext);
       }
     }
 
@@ -218,7 +251,7 @@ public class TezCompiler extends TaskCompiler {
 
   private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes,
       Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks,
-      Set<Set<Operator<?>>> components) {
+      Set<Set<Operator<?>>> components, ParseContext parseContext) {
 
     indexes.put(o, index.get());
     lowLinks.put(o, index.get());
@@ -232,13 +265,22 @@ public class TezCompiler extends TaskCompiler {
       TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan();
       LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
       children.add(ts);
+    } else if (o instanceof ReduceSinkOperator){
+      // min/max case
+      children = new ArrayList<Operator<?>>();
+      children.addAll(o.getChildOperators());
+      TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(o);
+      if (ts != null) {
+        LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+        children.add(ts);
+      }
     } else {
       children = o.getChildOperators();
     }
 
     for (Operator<?> child : children) {
       if (!indexes.containsKey(child)) {
-        connect(child, index, nodes, indexes, lowLinks, components);
+        connect(child, index, nodes, indexes, lowLinks, components, parseContext);
         lowLinks.put(o, Math.min(lowLinks.get(o), lowLinks.get(child)));
       } else if (nodes.contains(child)) {
         lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child)));
@@ -246,7 +288,7 @@ public class TezCompiler extends TaskCompiler {
     }
 
     if (lowLinks.get(o).equals(indexes.get(o))) {
-      Set<Operator<?>> component = new HashSet<Operator<?>>();
+      Set<Operator<?>> component = new LinkedHashSet<Operator<?>>();
       components.add(component);
       Operator<?> current;
       do {
@@ -315,14 +357,6 @@ public class TezCompiler extends TaskCompiler {
     topNodes.addAll(procCtx.parseContext.getTopOps().values());
     GraphWalker ogw = new ForwardWalker(disp);
     ogw.startWalking(topNodes, null);
-
-    // need a new run of the constant folding because we might have created lots
-    // of "and true and true" conditions.
-    // Rather than run the full constant folding just need to shortcut AND/OR expressions
-    // involving constant true/false values.
-    if(procCtx.conf.getBoolVar(ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
-      new ConstantPropagate(ConstantPropagateOption.SHORTCUT).transform(procCtx.parseContext);
-    }
   }
 
   @Override
@@ -401,6 +435,14 @@ public class TezCompiler extends TaskCompiler {
       GenTezUtils.processFileSink(procCtx, fileSink);
     }
 
+    // Connect any edges required for min/max pushdown
+    if (pCtx.getRsToRuntimeValuesInfoMap().size() > 0) {
+      for (ReduceSinkOperator rs : pCtx.getRsToRuntimeValuesInfoMap().keySet()) {
+        // Process min/max
+        GenTezUtils.processDynamicMinMaxPushDownOperator(
+                procCtx, pCtx.getRsToRuntimeValuesInfoMap().get(rs), rs);
+      }
+    }
     // and finally we hook up any events that need to be sent to the tez AM
     LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events.");
     for (AppMasterEventOperator event : procCtx.eventOperatorSet) {
@@ -528,4 +570,256 @@ public class TezCompiler extends TaskCompiler {
     perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "optimizeTaskPlan");
     return;
   }
+
+  private static class SemijoinRemovalContext implements NodeProcessorCtx {
+    List<Operator<?>> parents = new ArrayList<Operator<?>>();
+  }
+
+  private static class SemijoinRemovalProc implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                          Object... nodeOutputs) throws SemanticException {
+      SemijoinRemovalContext ctx = (SemijoinRemovalContext) procCtx;
+      Operator<?> parent = (Operator<?>) stack.get(stack.size() - 2);
+      ctx.parents.add(parent);
+      return null;
+    }
+  }
+
+  private static void collectSemijoinOps(Operator<?> ts, NodeProcessorCtx ctx) throws SemanticException {
+    // create a walker which walks the tree in a DFS manner while maintaining
+    // the operator stack. The dispatcher
+    // generates the plan from the operator tree
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("R1", SelectOperator.getOperatorName() + "%" +
+                    TezDummyStoreOperator.getOperatorName() + "%"),
+            new SemijoinRemovalProc());
+    opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + "%" +
+                    CommonMergeJoinOperator.getOperatorName() + "%"),
+            new SemijoinRemovalProc());
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, ctx);
+    GraphWalker ogw = new PreOrderOnceWalker(disp);
+    List<Node> startNodes = new ArrayList<Node>();
+    startNodes.add(ts);
+
+    HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
+    ogw.startWalking(startNodes, null);
+  }
+
+  private static class SMBJoinOpProc implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                          Object... nodeOutputs) throws SemanticException {
+      List<TableScanOperator> tsOps = new ArrayList<TableScanOperator>();
+      // Get one top level TS Op directly from the stack
+      tsOps.add((TableScanOperator)stack.get(0));
+
+      // Get the other one by examining Join Op
+      List<Operator<?>> parents = ((CommonMergeJoinOperator) nd).getParentOperators();
+      for (Operator<?> parent : parents) {
+        if (parent instanceof TezDummyStoreOperator) {
+          // already accounted for
+          continue;
+        }
+
+        assert parent instanceof SelectOperator;
+        while(parent != null) {
+          if (parent instanceof TableScanOperator) {
+            tsOps.add((TableScanOperator) parent);
+            break;
+          }
+          parent = parent.getParentOperators().get(0);
+        }
+      }
+
+      // Now the relevant TableScanOperators are known, find if there exists
+      // a semijoin filter on any of them, if so, remove it.
+      ParseContext pctx = ((OptimizeTezProcContext) procCtx).parseContext;
+      for (TableScanOperator ts : tsOps) {
+        for (ReduceSinkOperator rs : pctx.getRsOpToTsOpMap().keySet()) {
+          if (ts == pctx.getRsOpToTsOpMap().get(rs)) {
+            // match!
+            GenTezUtils.removeBranch(rs);
+            GenTezUtils.removeSemiJoinOperator(pctx, rs, ts);
+          }
+        }
+      }
+      return null;
+    }
+  }
+
+  private static void removeSemijoinOptimizationFromSMBJoins(
+          OptimizeTezProcContext procCtx) throws SemanticException {
+    if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) ||
+            procCtx.parseContext.getRsOpToTsOpMap().size() == 0) {
+      return;
+    }
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(
+            new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%" +
+                    ".*" + TezDummyStoreOperator.getOperatorName() + "%" +
+                    CommonMergeJoinOperator.getOperatorName() + "%"),
+            new SMBJoinOpProc());
+
+    // The dispatcher finds SMB and if there is semijoin optimization before it, removes it.
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
+    GraphWalker ogw = new PreOrderOnceWalker(disp);
+    ogw.startWalking(topNodes, null);
+  }
+
+  private static class SemiJoinCycleRemovalDueToMapsideJoins implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                          Object... nodeOutputs) throws SemanticException {
+      ParseContext pCtx = ((OptimizeTezProcContext) procCtx).parseContext;
+      Operator<?> childJoin = ((Operator<?>) nd);
+      Operator<?> parentJoin = ((Operator<?>) stack.get(stack.size() - 2));
+
+      if (parentJoin.getChildOperators().size() == 1) {
+        // Nothing to do here
+        return null;
+      }
+
+      for (Operator<?> child : parentJoin.getChildOperators()) {
+        if (!(child instanceof SelectOperator)) {
+          continue;
+        }
+
+        while(child.getChildOperators().size() > 0) {
+          child = child.getChildOperators().get(0);
+        }
+
+        if (!(child instanceof ReduceSinkOperator)) {
+          continue;
+        }
+
+        ReduceSinkOperator rs = ((ReduceSinkOperator) child);
+        TableScanOperator ts = pCtx.getRsOpToTsOpMap().get(rs);
+        if (ts == null) {
+          continue;
+        }
+        // This is a semijoin branch. Find if this is creating a potential
+        // cycle with childJoin.
+        for (Operator<?> parent : childJoin.getParentOperators()) {
+          if (parent == parentJoin) {
+            continue;
+          }
+
+          assert parent instanceof ReduceSinkOperator;
+          while (parent.getParentOperators().size() > 0) {
+            parent = parent.getParentOperators().get(0);
+          }
+
+          if (parent == ts) {
+            // We have a cycle!
+            GenTezUtils.removeBranch(rs);
+            GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts);
+          }
+        }
+      }
+      return null;
+    }
+  }
+
+  private static void removeSemiJoinCyclesDueToMapsideJoins(
+          OptimizeTezProcContext procCtx) throws SemanticException {
+    if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) ||
+            procCtx.parseContext.getRsOpToTsOpMap().size() == 0) {
+      return;
+    }
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(
+            new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%" +
+                    MapJoinOperator.getOperatorName() + "%"),
+            new SemiJoinCycleRemovalDueToMapsideJoins());
+    opRules.put(
+            new RuleRegExp("R2", MapJoinOperator.getOperatorName() + "%" +
+                    CommonMergeJoinOperator.getOperatorName() + "%"),
+            new SemiJoinCycleRemovalDueToMapsideJoins());
+    opRules.put(
+            new RuleRegExp("R3", CommonMergeJoinOperator.getOperatorName() + "%" +
+                    MapJoinOperator.getOperatorName() + "%"),
+            new SemiJoinCycleRemovalDueToMapsideJoins());
+    opRules.put(
+            new RuleRegExp("R4", CommonMergeJoinOperator.getOperatorName() + "%" +
+                    CommonMergeJoinOperator.getOperatorName() + "%"),
+            new SemiJoinCycleRemovalDueToMapsideJoins());
+
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
+    GraphWalker ogw = new PreOrderOnceWalker(disp);
+    ogw.startWalking(topNodes, null);
+  }
+
+  private static class SemiJoinRemovalIfNoStatsProc implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                          Object... nodeOutputs) throws SemanticException {
+      assert nd instanceof ReduceSinkOperator;
+      ReduceSinkOperator rs = (ReduceSinkOperator) nd;
+      ParseContext pCtx = ((OptimizeTezProcContext) procCtx).parseContext;
+      TableScanOperator ts = pCtx.getRsOpToTsOpMap().get(rs);
+      if (ts == null) {
+        // nothing to do here.
+        return null;
+      }
+
+      // This is a semijoin branch. The stack should look like,
+      // <Parent Ops>-SEL-GB1-RS1-GB2-RS2
+      GroupByOperator gbOp = (GroupByOperator) (stack.get(stack.size() - 2));
+      GroupByDesc gbDesc = gbOp.getConf();
+      ArrayList<AggregationDesc> aggregationDescs = gbDesc.getAggregators();
+      boolean removeSemiJoin = false;
+      for (AggregationDesc agg : aggregationDescs) {
+        if (agg.getGenericUDAFName() != "bloom_filter") {
+          continue;
+        }
+
+        GenericUDAFBloomFilterEvaluator udafBloomFilterEvaluator =
+                (GenericUDAFBloomFilterEvaluator) agg.getGenericUDAFEvaluator();
+        long expectedEntries = udafBloomFilterEvaluator.getExpectedEntries();
+        if (expectedEntries == -1 || expectedEntries >
+                pCtx.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)) {
+          removeSemiJoin = true;
+          break;
+        }
+      }
+      if (removeSemiJoin) {
+        // The stats are not annotated, remove the semijoin operator
+        GenTezUtils.removeBranch(rs);
+        GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts);
+      }
+      return null;
+    }
+  }
+
+  private void removeSemiJoinIfNoStats(OptimizeTezProcContext procCtx)
+          throws SemanticException {
+    if(!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION)) {
+      // Not needed without semi-join reduction
+      return;
+    }
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(
+            new RuleRegExp("R1", GroupByOperator.getOperatorName() + "%" +
+                    ReduceSinkOperator.getOperatorName() + "%" +
+                    GroupByOperator.getOperatorName() + "%" +
+                    ReduceSinkOperator.getOperatorName() + "%"),
+            new SemiJoinRemovalIfNoStatsProc());
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
+    GraphWalker ogw = new PreOrderOnceWalker(disp);
+    ogw.startWalking(topNodes, null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
index 1ecbaad..f0b062e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
@@ -152,6 +152,13 @@ public class AggregationDesc implements java.io.Serializable {
       }
       sb.append(exp.getExprString());
     }
+
+    String evaluatorExpr = getGenericUDAFEvaluator().getExprString();
+    if (evaluatorExpr != null && !evaluatorExpr.isEmpty()) {
+      sb.append(", ");
+      sb.append(evaluatorExpr);
+    }
+
     sb.append(")");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 13a0811..8c341fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -76,6 +78,10 @@ public abstract class BaseWork extends AbstractOperatorDesc {
 
   private int reservedMemoryMB = -1;  // default to -1 means we leave it up to Tez to decide
 
+  // Used for value registry
+  private Map<String, RuntimeValuesInfo> inputSourceToRuntimeValuesInfo =
+          new HashMap<String, RuntimeValuesInfo>();
+
   public void setGatheringStats(boolean gatherStats) {
     this.gatheringStats = gatherStats;
   }
@@ -251,4 +257,13 @@ public abstract class BaseWork extends AbstractOperatorDesc {
   public List<String> getSortCols() {
     return sortColNames;
   }
+
+  public Map<String, RuntimeValuesInfo> getInputSourceToRuntimeValuesInfo() {
+    return inputSourceToRuntimeValuesInfo;
+  }
+
+  public void setInputSourceToRuntimeValuesInfo(
+          String workName, RuntimeValuesInfo runtimeValuesInfo) {
+    inputSourceToRuntimeValuesInfo.put(workName, runtimeValuesInfo);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicValue.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicValue.java
new file mode 100644
index 0000000..874c62b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicValue.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.DynamicValueRegistry;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.sarg.LiteralDelegate;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.io.Serializable;
+
+
+public class DynamicValue implements LiteralDelegate, Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  public static final String DYNAMIC_VALUE_REGISTRY_CACHE_KEY = "DynamicValueRegistry";
+
+  protected transient Configuration conf;
+
+  protected String id;
+  TypeInfo typeInfo;
+  PrimitiveObjectInspector objectInspector;
+
+  transient protected Object val;
+  transient boolean initialized = false;
+
+  public DynamicValue(String id, TypeInfo typeInfo) {
+    this.id = id;
+    this.typeInfo = typeInfo;
+    this.objectInspector = (PrimitiveObjectInspector) TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public TypeInfo getTypeInfo() {
+    return typeInfo;
+  }
+
+  public void setTypeInfo(TypeInfo typeInfo) {
+    this.typeInfo = typeInfo;
+  }
+
+  public PrimitiveObjectInspector getObjectInspector() {
+    return objectInspector;
+  }
+
+  public void setObjectInspector(PrimitiveObjectInspector objectInspector) {
+    this.objectInspector = objectInspector;
+  }
+
+  @Override
+  public String getId() { return id;}
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  @Override
+  public Object getLiteral() {
+    return getJavaValue();
+  }
+
+  public Object getJavaValue() {
+    return objectInspector.getPrimitiveJavaObject(getValue());
+  }
+
+  public Object getWritableValue() {
+    return objectInspector.getPrimitiveWritableObject(getValue());
+  }
+
+  public Object getValue() {
+    if (initialized) {
+      return val;
+    }
+
+    if (conf == null) {
+      throw new IllegalStateException("Cannot retrieve dynamic value " + id + " - no conf set");
+    }
+
+    try {
+      // Get object cache
+      String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
+      ObjectCache cache = ObjectCacheFactory.getCache(conf, queryId, false);
+
+      // Get the registry
+      DynamicValueRegistry valueRegistry = cache.retrieve(DYNAMIC_VALUE_REGISTRY_CACHE_KEY);
+      if (valueRegistry == null) {
+        throw new IllegalStateException("DynamicValueRegistry not available");
+      }
+      val = valueRegistry.getValue(id);
+      initialized = true;
+    } catch (Exception err) {
+      throw new IllegalStateException("Failed to retrieve dynamic value for " + id, err);
+    }
+
+    return val;
+  }
+
+  @Override
+  public String toString() {
+    // If the id is a generated unique ID then this could affect .q file golden files for tests that run EXPLAIN queries.
+    return "DynamicValue(" + id + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicValueDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicValueDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicValueDesc.java
new file mode 100644
index 0000000..c9e7b67
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicValueDesc.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+
+
+/**
+ * This expression represents a value that will be available at runtime.
+ *
+ */
+public class ExprNodeDynamicValueDesc extends ExprNodeDesc implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  protected DynamicValue dynamicValue;
+
+  public ExprNodeDynamicValueDesc() {
+  }
+
+  public ExprNodeDynamicValueDesc(DynamicValue value) {
+    super(value.getTypeInfo());
+    this.dynamicValue = value;
+  }
+
+  @Override
+  public ExprNodeDesc clone() {
+    return new ExprNodeDynamicValueDesc(dynamicValue);
+  }
+
+  @Override
+  public boolean isSame(Object o) {
+    if (o instanceof ExprNodeDynamicValueDesc) {
+      Object otherValue = ((ExprNodeDynamicValueDesc) o).getDynamicValue();
+      if (dynamicValue == null) {
+        return otherValue == null;
+      }
+      return dynamicValue.equals(otherValue);
+    }
+    return false;
+  }
+
+  public DynamicValue getDynamicValue() {
+    return dynamicValue;
+  }
+
+  public void setValue(DynamicValue value) {
+    this.dynamicValue = value;
+  }
+
+  @Override
+  public String getExprString() {
+    return dynamicValue != null ? dynamicValue.toString() : "null dynamic literal";
+  }
+
+  @Override
+  public String toString() {
+    return dynamicValue != null ? dynamicValue.toString() : "null dynamic literal";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
new file mode 100644
index 0000000..fb9a140
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.BloomFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.sql.Timestamp;
+
+/**
+ * Generic UDF to generate Bloom Filter
+ */
+public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GenericUDAFBloomFilter.class);
+
+  @Override
+  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
+    return new GenericUDAFBloomFilterEvaluator();
+  }
+
+  @Override
+  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
+    return new GenericUDAFBloomFilterEvaluator();
+  }
+
+  /**
+   * GenericUDAFBloomFilterEvaluator - Evaluator class for BloomFilter
+   */
+  public static class GenericUDAFBloomFilterEvaluator extends GenericUDAFEvaluator {
+    // Source operator to get the number of entries
+    private Operator<?> sourceOperator;
+    private long maxEntries = 0;
+
+    // ObjectInspector for input data.
+    private PrimitiveObjectInspector inputOI;
+
+    // Bloom filter rest
+    private ByteArrayOutputStream result = new ByteArrayOutputStream();
+
+    @Override
+    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+      super.init(m, parameters);
+
+      // Initialize input
+      if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
+        inputOI = (PrimitiveObjectInspector) parameters[0];
+      } else {
+        // Do nothing for other modes
+      }
+
+      // Output will be same in both partial or full aggregation modes.
+      // It will be a BloomFilter in ByteWritable
+      return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+    }
+
+    /**
+     * Class for storing the BloomFilter
+     */
+    @AggregationType(estimable = true)
+    static class BloomFilterBuf extends AbstractAggregationBuffer {
+      BloomFilter bloomFilter;
+
+      public BloomFilterBuf(long expectedEntries, long maxEntries) {
+        if (expectedEntries > maxEntries) {
+          bloomFilter = new BloomFilter(1);
+        } else {
+          bloomFilter = new BloomFilter(expectedEntries);
+        }
+      }
+
+      @Override
+      public int estimate() {
+        return (int) bloomFilter.sizeInBytes();
+      }
+    }
+
+    @Override
+    public void reset(AggregationBuffer agg) throws HiveException {
+      ((BloomFilterBuf)agg).bloomFilter.reset();
+    }
+
+    @Override
+    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+      long expectedEntries = getExpectedEntries();
+      if (expectedEntries < 0) {
+        throw new IllegalStateException("BloomFilter expectedEntries not initialized");
+      }
+
+      BloomFilterBuf buf = new BloomFilterBuf(expectedEntries, maxEntries);
+      reset(buf);
+      return buf;
+    }
+
+    @Override
+    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+      if (parameters == null || parameters[0] == null) {
+        // 2nd condition occurs when the input has 0 rows (possible due to
+        // filtering, joins etc).
+        return;
+      }
+
+      BloomFilter bf = ((BloomFilterBuf)agg).bloomFilter;
+
+      // Add the expression into the BloomFilter
+      switch (inputOI.getPrimitiveCategory()) {
+        case BOOLEAN:
+          boolean vBoolean = ((BooleanObjectInspector)inputOI).get(parameters[0]);
+          bf.addLong(vBoolean ? 1 : 0);
+          break;
+        case BYTE:
+          byte vByte = ((ByteObjectInspector)inputOI).get(parameters[0]);
+          bf.addLong(vByte);
+          break;
+        case SHORT:
+          short vShort = ((ShortObjectInspector)inputOI).get(parameters[0]);
+          bf.addLong(vShort);
+          break;
+        case INT:
+          int vInt = ((IntObjectInspector)inputOI).get(parameters[0]);
+          bf.addLong(vInt);
+          break;
+        case LONG:
+          long vLong = ((LongObjectInspector)inputOI).get(parameters[0]);
+          bf.addLong(vLong);
+          break;
+        case FLOAT:
+          float vFloat = ((FloatObjectInspector)inputOI).get(parameters[0]);
+          bf.addDouble(vFloat);
+          break;
+        case DOUBLE:
+          double vDouble = ((DoubleObjectInspector)inputOI).get(parameters[0]);
+          bf.addDouble(vDouble);
+          break;
+        case DECIMAL:
+          HiveDecimal vDecimal = ((HiveDecimalObjectInspector)inputOI).
+                  getPrimitiveJavaObject(parameters[0]);
+          bf.addString(vDecimal.toString());
+          break;
+        case DATE:
+          DateWritable vDate = ((DateObjectInspector)inputOI).
+                  getPrimitiveWritableObject(parameters[0]);
+          bf.addLong(vDate.getDays());
+          break;
+        case TIMESTAMP:
+          Timestamp vTimeStamp = ((TimestampObjectInspector)inputOI).
+                  getPrimitiveJavaObject(parameters[0]);
+          bf.addLong(vTimeStamp.getTime());
+          break;
+        case CHAR:
+          Text vChar = ((HiveCharObjectInspector)inputOI).
+                  getPrimitiveWritableObject(parameters[0]).getStrippedValue();
+          bf.addBytes(vChar.getBytes(), 0, vChar.getLength());
+          break;
+        case VARCHAR:
+          Text vVarChar = ((HiveVarcharObjectInspector)inputOI).
+                  getPrimitiveWritableObject(parameters[0]).getTextValue();
+          bf.addBytes(vVarChar.getBytes(), 0, vVarChar.getLength());
+          break;
+        case STRING:
+          Text vString = ((StringObjectInspector)inputOI).
+                  getPrimitiveWritableObject(parameters[0]);
+          bf.addBytes(vString.getBytes(), 0, vString.getLength());
+          break;
+        case BINARY:
+          BytesWritable vBytes = ((BinaryObjectInspector)inputOI).
+                  getPrimitiveWritableObject(parameters[0]);
+          bf.addBytes(vBytes.getBytes(), 0, vBytes.getLength());
+          break;
+          default:
+            throw new UDFArgumentTypeException(0,
+                    "Bad primitive category " + inputOI.getPrimitiveCategory());
+      }
+    }
+
+    @Override
+    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+      if (partial == null) {
+        return;
+      }
+
+      BytesWritable bytes = (BytesWritable) partial;
+      ByteArrayInputStream in = new ByteArrayInputStream(bytes.getBytes());
+      // Deserialze the bloomfilter
+      try {
+        BloomFilter bf = BloomFilter.deserialize(in);
+        ((BloomFilterBuf)agg).bloomFilter.merge(bf);
+      } catch (IOException e) {
+        throw new HiveException(e);
+      }
+    }
+
+    @Override
+    public Object terminate(AggregationBuffer agg) throws HiveException {
+      result.reset();
+      try {
+        BloomFilter.serialize(result, ((BloomFilterBuf)agg).bloomFilter);
+      } catch (IOException e) {
+        throw new HiveException(e);
+      }
+      return new BytesWritable(result.toByteArray());
+    }
+
+    @Override
+    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+      return terminate(agg);
+    }
+
+    public long getExpectedEntries() {
+      if (sourceOperator != null && sourceOperator.getStatistics() != null) {
+        return sourceOperator.getStatistics().getNumRows();
+      }
+      return -1;
+    }
+
+    public Operator<?> getSourceOperator() {
+      return sourceOperator;
+    }
+
+    public void setSourceOperator(Operator<?> sourceOperator) {
+      this.sourceOperator = sourceOperator;
+    }
+
+    public void setMaxEntries(long maxEntries) {
+      this.maxEntries = maxEntries;
+    }
+
+    @Override
+    public String getExprString() {
+      return "expectedEntries=" + getExpectedEntries();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
index 18d5285..3a98276 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
@@ -262,6 +262,13 @@ public abstract class GenericUDAFEvaluator implements Closeable {
     return null;
   }
 
+  /**
+   * Optional information to add to expression string. Subclasses can override.
+   */
+  public String getExprString() {
+    return "";
+  }
+
   protected BasePartitionEvaluator partitionEvaluator;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
new file mode 100644
index 0000000..1b7de6c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.BloomFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.sql.Timestamp;
+
+/**
+ * GenericUDF to lookup a value in BloomFilter
+ */
+public class GenericUDFInBloomFilter extends GenericUDF {
+  private static final Logger LOG = LoggerFactory.getLogger(GenericUDFInBloomFilter.class);
+
+  private transient ObjectInspector valObjectInspector;
+  private transient ObjectInspector bloomFilterObjectInspector;
+  private transient BloomFilter bloomFilter;
+  private transient boolean initializedBloomFilter;
+
+  @Override
+  public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+    if (arguments.length != 2) {
+      throw new UDFArgumentLengthException(
+              "InBloomFilter requires exactly 2 arguments but got " + arguments.length);
+    }
+
+    // Verify individual arguments
+    if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+      throw new UDFArgumentTypeException(0, "The 1st argument must be a primitive type but "
+      + arguments[0].getTypeName() + " was passed");
+    }
+
+    if (((PrimitiveObjectInspector) arguments[1]).getPrimitiveCategory() !=
+            PrimitiveObjectInspector.PrimitiveCategory.BINARY) {
+      throw new UDFArgumentTypeException(1, "The 2nd argument must be a binary type but " +
+      arguments[1].getTypeName() + " was passed");
+    }
+
+    valObjectInspector = arguments[0];
+    bloomFilterObjectInspector = arguments[1];
+    assert bloomFilterObjectInspector instanceof WritableBinaryObjectInspector;
+
+    initializedBloomFilter = false;
+    return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
+  }
+
+  @Override
+  public String getDisplayString(String[] children) {
+    return getStandardDisplayString("in_bloom_filter", children);
+  }
+
+  @Override
+  public Object evaluate(DeferredObject[] arguments) throws HiveException {
+    // Return if either of the arguments is null
+    if (arguments[0].get() == null || arguments[1].get() == null) {
+      return null;
+    }
+
+    if (!initializedBloomFilter) {
+      // Setup the bloom filter once
+      try {
+        BytesWritable bw = (BytesWritable) arguments[1].get();
+        byte[] bytes = new byte[bw.getLength()];
+        System.arraycopy(bw.getBytes(), 0, bytes, 0, bw.getLength());
+        bloomFilter = BloomFilter.deserialize(new ByteArrayInputStream(bytes));
+      } catch ( IOException e) {
+        throw new HiveException(e);
+      }
+      initializedBloomFilter = true;
+    }
+
+    // Check if the value is in bloom filter
+    switch (((PrimitiveObjectInspector)valObjectInspector).
+            getTypeInfo().getPrimitiveCategory()) {
+      case BOOLEAN:
+        boolean vBoolean = ((BooleanObjectInspector)valObjectInspector).
+                get(arguments[0].get());
+        return bloomFilter.testLong(vBoolean ? 1 : 0);
+      case BYTE:
+        byte vByte = ((ByteObjectInspector) valObjectInspector).
+                get(arguments[0].get());
+        return bloomFilter.testLong(vByte);
+      case SHORT:
+        short vShort = ((ShortObjectInspector) valObjectInspector).
+                get(arguments[0].get());
+        return bloomFilter.testLong(vShort);
+      case INT:
+        int vInt = ((IntObjectInspector) valObjectInspector).
+                get(arguments[0].get());
+        return bloomFilter.testLong(vInt);
+      case LONG:
+        long vLong = ((LongObjectInspector) valObjectInspector).
+                get(arguments[0].get());
+        return bloomFilter.testLong(vLong);
+      case FLOAT:
+        float vFloat = ((FloatObjectInspector) valObjectInspector).
+                get(arguments[0].get());
+        return  bloomFilter.testDouble(vFloat);
+      case DOUBLE:
+        double vDouble = ((DoubleObjectInspector) valObjectInspector).
+                get(arguments[0].get());
+        return bloomFilter.testDouble(vDouble);
+      case DECIMAL:
+        HiveDecimal vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
+                getPrimitiveJavaObject(arguments[0].get());
+        return bloomFilter.testString(vDecimal.toString());
+      case DATE:
+        DateWritable vDate = ((DateObjectInspector) valObjectInspector).
+                getPrimitiveWritableObject(arguments[0].get());
+        return bloomFilter.testLong(vDate.getDays());
+      case TIMESTAMP:
+        Timestamp vTimeStamp = ((TimestampObjectInspector) valObjectInspector).
+                getPrimitiveJavaObject(arguments[0].get());
+        return bloomFilter.testLong(vTimeStamp.getTime());
+      case CHAR:
+        Text vChar = ((HiveCharObjectInspector) valObjectInspector).
+                getPrimitiveWritableObject(arguments[0].get()).getStrippedValue();
+        return bloomFilter.testBytes(vChar.getBytes(), 0, vChar.getLength());
+      case VARCHAR:
+        Text vVarchar = ((HiveVarcharObjectInspector) valObjectInspector).
+                getPrimitiveWritableObject(arguments[0].get()).getTextValue();
+        return bloomFilter.testBytes(vVarchar.getBytes(), 0, vVarchar.getLength());
+      case STRING:
+        Text vString = ((StringObjectInspector) valObjectInspector).
+                getPrimitiveWritableObject(arguments[0].get());
+        return bloomFilter.testBytes(vString.getBytes(), 0, vString.getLength());
+      case BINARY:
+        BytesWritable vBytes = ((BinaryObjectInspector) valObjectInspector).
+                getPrimitiveWritableObject(arguments[0].get());
+        return bloomFilter.testBytes(vBytes.getBytes(), 0, vBytes.getLength());
+      default:
+        throw new UDFArgumentTypeException(0, "Bad primitive category " +
+                ((PrimitiveTypeInfo) valObjectInspector).getPrimitiveCategory());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
index 93b50a6..6563290 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
@@ -28,6 +28,7 @@ import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -44,6 +45,8 @@ import com.google.common.collect.Sets;
  */
 public class TestConvertAstToSearchArg {
 
+  private final Configuration conf = new Configuration();
+
   private static void assertNoSharedNodes(ExpressionTree tree,
                                           Set<ExpressionTree> seen
                                           ) throws Exception {
@@ -547,7 +550,7 @@ public class TestConvertAstToSearchArg {
         "</java> \n";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(9, leaves.size());
 
@@ -836,7 +839,7 @@ public class TestConvertAstToSearchArg {
         "</java> \n";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(4, leaves.size());
 
@@ -1269,7 +1272,7 @@ public class TestConvertAstToSearchArg {
         "</java> \n";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(3, leaves.size());
 
@@ -1493,7 +1496,7 @@ public class TestConvertAstToSearchArg {
         "\n";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(3, leaves.size());
 
@@ -1763,7 +1766,7 @@ public class TestConvertAstToSearchArg {
         "</java> \n";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(1, leaves.size());
 
@@ -2246,7 +2249,7 @@ public class TestConvertAstToSearchArg {
         "</java>";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(9, leaves.size());
 
@@ -2405,7 +2408,7 @@ public class TestConvertAstToSearchArg {
         "</java> ";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(0, leaves.size());
 
@@ -2538,7 +2541,7 @@ public class TestConvertAstToSearchArg {
         "</java> ";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(0, leaves.size());
 
@@ -2663,7 +2666,7 @@ public class TestConvertAstToSearchArg {
         "</java>";
 
     SearchArgumentImpl sarg =
-        (SearchArgumentImpl) ConvertAstToSearchArg.create(getFuncDesc(exprStr));
+        (SearchArgumentImpl) ConvertAstToSearchArg.create(conf, getFuncDesc(exprStr));
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(1, leaves.size());
 
@@ -2712,7 +2715,7 @@ public class TestConvertAstToSearchArg {
           "AAABgj0BRVFVQcwBBW9yZy5hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5Q" +
           "EAAAECAQFib29sZWHu";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2731,7 +2734,7 @@ public class TestConvertAstToSearchArg {
             "Y2hlLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUH" +
             "MAQVvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g==";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2751,7 +2754,7 @@ public class TestConvertAstToSearchArg {
             "oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQZvcmcuYXBhY2" +
             "hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g==";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2771,7 +2774,7 @@ public class TestConvertAstToSearchArg {
             "vb3AuaGl2ZS5xbC51ZGYuZ2VuZXJpYy5HZW5lcmljVURGT1BFcXVh7AEAAAGCPQFFUVVBzAEGb3JnLm" +
             "FwYWNoZS5oYWRvb3AuaW8uQm9vbGVhbldyaXRhYmzlAQAAAQQBAWJvb2xlYe4=";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2791,7 +2794,7 @@ public class TestConvertAstToSearchArg {
             "lLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQ" +
             "ZvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g==";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2810,7 +2813,7 @@ public class TestConvertAstToSearchArg {
             "dmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5hcGFjaGU" +
             "uaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2831,7 +2834,7 @@ public class TestConvertAstToSearchArg {
             "hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAwkBAgEBYrIAAAgBAwkBB29yZy5hcGFjaGUua" +
             "GFkb29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QQW7kAQEGAQAAAQMJ";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("(and leaf-0 leaf-1)", sarg.getExpression().toString());
     assertEquals(2, sarg.getLeaves().size());
@@ -2853,7 +2856,7 @@ public class TestConvertAstToSearchArg {
             "aXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQVvcmcuYXBhY2h" +
             "lLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g==";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());
@@ -2872,7 +2875,7 @@ public class TestConvertAstToSearchArg {
             "b29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5" +
             "hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu";
     SearchArgument sarg =
-        new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst))
+        new ConvertAstToSearchArg(conf, SerializationUtilities.deserializeExpression(serialAst))
             .buildSearchArgument();
     assertEquals("leaf-0", sarg.getExpression().toString());
     assertEquals(1, sarg.getLeaves().size());

http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
index 8cbc26d..df42058 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
@@ -79,7 +79,7 @@ public class TestSearchArgumentImpl {
                                                   Object literal,
                                                   List<Object> literalList) {
     return new SearchArgumentImpl.PredicateLeafImpl(operator, type, columnName,
-        literal, literalList);
+        literal, literalList, null);
   }
 
   @Test