You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/24 20:02:53 UTC
[6/6] hive git commit: HIVE-15269: Dynamic Min-Max/BloomFilter
runtime-filtering for Tez (Deepak Jaiswal via Jason Dere)
HIVE-15269: Dynamic Min-Max/BloomFilter runtime-filtering for Tez (Deepak Jaiswal via Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc3fd84e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc3fd84e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc3fd84e
Branch: refs/heads/master
Commit: cc3fd84ee3ac2f855e257a94bad894909bf628f4
Parents: 3040f6e
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Jan 24 12:01:41 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Jan 24 12:01:41 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../test/resources/testconfiguration.properties | 2 +
.../apache/orc/impl/TestRecordReaderImpl.java | 2 +-
.../hive/ql/exec/AbstractMapJoinOperator.java | 2 +-
.../hadoop/hive/ql/exec/CommonJoinOperator.java | 4 +-
.../hive/ql/exec/DynamicValueRegistry.java | 30 +
.../hive/ql/exec/ExprNodeColumnEvaluator.java | 5 +-
.../exec/ExprNodeConstantDefaultEvaluator.java | 7 +-
.../hive/ql/exec/ExprNodeConstantEvaluator.java | 7 +-
.../ql/exec/ExprNodeDynamicValueEvaluator.java | 54 +
.../hadoop/hive/ql/exec/ExprNodeEvaluator.java | 13 +-
.../hive/ql/exec/ExprNodeEvaluatorFactory.java | 18 +-
.../hive/ql/exec/ExprNodeEvaluatorHead.java | 2 +-
.../hive/ql/exec/ExprNodeEvaluatorRef.java | 2 +-
.../hive/ql/exec/ExprNodeFieldEvaluator.java | 7 +-
.../ql/exec/ExprNodeGenericFuncEvaluator.java | 7 +-
.../hadoop/hive/ql/exec/FilterOperator.java | 2 +-
.../hadoop/hive/ql/exec/FunctionRegistry.java | 4 +-
.../hadoop/hive/ql/exec/GroupByOperator.java | 36 +-
.../hive/ql/exec/HashTableSinkOperator.java | 6 +-
.../apache/hadoop/hive/ql/exec/JoinUtil.java | 8 +-
.../apache/hadoop/hive/ql/exec/ObjectCache.java | 10 +
.../hadoop/hive/ql/exec/ObjectCacheWrapper.java | 5 +
.../hadoop/hive/ql/exec/SelectOperator.java | 2 +-
.../hadoop/hive/ql/exec/mr/ObjectCache.java | 5 +
.../ql/exec/tez/DynamicValueRegistryTez.java | 131 ++
.../hive/ql/exec/tez/LlapObjectCache.java | 18 +
.../hive/ql/exec/tez/MapRecordProcessor.java | 29 +-
.../hadoop/hive/ql/exec/tez/ObjectCache.java | 16 +
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 29 +-
.../ql/exec/vector/VectorMapJoinOperator.java | 2 +-
.../exec/vector/VectorSMBMapJoinOperator.java | 2 +-
.../hive/ql/io/sarg/ConvertAstToSearchArg.java | 56 +-
.../DynamicPartitionPruningOptimization.java | 342 +++-
.../optimizer/FixedBucketPruningOptimizer.java | 2 +-
...edundantDynamicPruningConditionsRemoval.java | 24 +-
.../stats/annotation/StatsRulesProcFactory.java | 64 +-
.../hadoop/hive/ql/parse/GenTezUtils.java | 170 +-
.../hadoop/hive/ql/parse/ParseContext.java | 21 +
.../hadoop/hive/ql/parse/RuntimeValuesInfo.java | 62 +
.../hadoop/hive/ql/parse/TaskCompiler.java | 3 +
.../hadoop/hive/ql/parse/TezCompiler.java | 422 ++++-
.../hadoop/hive/ql/plan/AggregationDesc.java | 7 +
.../apache/hadoop/hive/ql/plan/BaseWork.java | 15 +
.../hadoop/hive/ql/plan/DynamicValue.java | 137 ++
.../hive/ql/plan/ExprNodeDynamicValueDesc.java | 76 +
.../ql/udf/generic/GenericUDAFBloomFilter.java | 267 +++
.../ql/udf/generic/GenericUDAFEvaluator.java | 7 +
.../ql/udf/generic/GenericUDFInBloomFilter.java | 168 ++
.../ql/io/sarg/TestConvertAstToSearchArg.java | 39 +-
.../hive/ql/io/sarg/TestSearchArgumentImpl.java | 2 +-
.../ql/optimizer/physical/TestVectorizer.java | 26 +-
.../clientpositive/dynamic_semijoin_reduction.q | 68 +
.../llap/dynamic_partition_pruning.q.out | 69 +-
.../llap/dynamic_semijoin_reduction.q.out | 1535 ++++++++++++++++++
.../clientpositive/llap/join32_lessSize.q.out | 2 +-
.../clientpositive/llap/llap_partitioned.q.out | 2 +-
.../results/clientpositive/llap/mergejoin.q.out | 561 ++++++-
.../results/clientpositive/llap/orc_llap.q.out | 78 +-
.../clientpositive/llap/subquery_scalar.q.out | 16 +-
.../vectorized_dynamic_partition_pruning.q.out | 71 +-
.../results/clientpositive/perf/query16.q.out | 14 +-
.../results/clientpositive/perf/query6.q.out | 8 +-
.../results/clientpositive/perf/query83.q.out | 16 +-
.../results/clientpositive/show_functions.q.out | 2 +
.../clientpositive/tez/explainanalyze_3.q.out | 127 +-
.../clientpositive/tez/explainuser_3.q.out | 131 +-
.../hadoop/hive/ql/io/sarg/LiteralDelegate.java | 31 +
.../hive/ql/io/sarg/SearchArgumentFactory.java | 9 +-
.../hive/ql/io/sarg/SearchArgumentImpl.java | 65 +-
.../apache/hive/common/util/BloomFilter.java | 51 +
71 files changed, 4741 insertions(+), 497 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index fc9734b..8e319c6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2851,6 +2851,11 @@ public class HiveConf extends Configuration {
TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE("hive.tez.dynamic.partition.pruning.max.data.size", 100*1024*1024L,
"Maximum total data size of events in dynamic pruning."),
+ TEZ_DYNAMIC_SEMIJOIN_REDUCTION("hive.tez.dynamic.semijoin.reduction", true,
+ "When dynamic semijoin is enabled, shuffle joins will perform a leaky semijoin before shuffle. This " +
+ "requires hive.tez.dynamic.partition.pruning to be enabled."),
+ TEZ_MAX_BLOOM_FILTER_ENTRIES("hive.tez.max.bloom.filter.entries", 100000000L,
+ "Bloom filter should be of at max certain size to be effective"),
TEZ_SMB_NUMBER_WAVES(
"hive.tez.smb.number.waves",
(float) 0.5,
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 792de2e..bd76b7d 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -154,6 +154,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
delete_whole_partition.q,\
disable_merge_for_bucketing.q,\
dynamic_partition_pruning.q,\
+ dynamic_semijoin_reduction.q,\
dynpart_sort_opt_vectorization.q,\
dynpart_sort_optimization.q,\
dynpart_sort_optimization2.q,\
@@ -480,6 +481,7 @@ minillaplocal.query.files=acid_globallimit.q,\
correlationoptimizer6.q,\
disable_merge_for_bucketing.q,\
dynamic_partition_pruning.q,\
+ dynamic_semijoin_reduction.q,\
dynpart_sort_opt_vectorization.q,\
dynpart_sort_optimization.q,\
dynpart_sort_optimization_acid.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
index cdd62ac..30b42ee 100644
--- a/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
+++ b/orc/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
@@ -76,7 +76,7 @@ public class TestRecordReaderImpl {
Object literal,
List<Object> literalList) {
return new SearchArgumentImpl.PredicateLeafImpl(operator, type, columnName,
- literal, literalList);
+ literal, literalList, null);
}
// can add .verboseLogging() to cause Mockito to log invocations
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
index 69ba4a2..669e23e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
@@ -70,7 +70,7 @@ public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends Co
if (conf.getGenJoinKeys()) {
int tagLen = conf.getTagLength();
joinKeys = new List[tagLen];
- JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE);
+ JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE, hconf);
joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
index 7e9007c..df1898e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
@@ -254,11 +254,11 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
noOuterJoin = conf.isNoOuterJoin();
totalSz = JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(),
- order,NOTSKIPBIGTABLE);
+ order,NOTSKIPBIGTABLE, hconf);
//process join filters
joinFilters = new List[tagLen];
- JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE);
+ JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE, hconf);
joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/DynamicValueRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DynamicValueRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DynamicValueRegistry.java
new file mode 100644
index 0000000..63336bd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DynamicValueRegistry.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+public interface DynamicValueRegistry {
+
+ // Abstract class to hold info required for the implementation
+ public static abstract class RegistryConf {
+ }
+
+ Object getValue(String key) throws Exception;
+
+ void init(RegistryConf conf) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
index 24c8281..b0384df 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -38,8 +39,8 @@ public class ExprNodeColumnEvaluator extends ExprNodeEvaluator<ExprNodeColumnDes
private transient StructField[] fields;
private transient boolean[] unionField;
- public ExprNodeColumnEvaluator(ExprNodeColumnDesc expr) {
- super(expr);
+ public ExprNodeColumnEvaluator(ExprNodeColumnDesc expr, Configuration conf) {
+ super(expr, conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
index 89a75eb..f53c3e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -33,7 +34,11 @@ public class ExprNodeConstantDefaultEvaluator extends ExprNodeEvaluator<ExprNode
transient ObjectInspector writableObjectInspector;
public ExprNodeConstantDefaultEvaluator(ExprNodeConstantDefaultDesc expr) {
- super(expr);
+ this(expr, null);
+ }
+
+ public ExprNodeConstantDefaultEvaluator(ExprNodeConstantDefaultDesc expr, Configuration conf) {
+ super(expr, conf);
writableObjectInspector = expr.getWritableObjectInspector();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java
index 4fe72a0..ca39e21 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
@@ -32,7 +33,11 @@ public class ExprNodeConstantEvaluator extends ExprNodeEvaluator<ExprNodeConstan
transient ConstantObjectInspector writableObjectInspector;
public ExprNodeConstantEvaluator(ExprNodeConstantDesc expr) {
- super(expr);
+ this(expr, null);
+ }
+
+ public ExprNodeConstantEvaluator(ExprNodeConstantDesc expr, Configuration conf) {
+ super(expr, conf);
writableObjectInspector = expr.getWritableObjectInspector();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeDynamicValueEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeDynamicValueEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeDynamicValueEvaluator.java
new file mode 100644
index 0000000..6c68215
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeDynamicValueEvaluator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+
+/**
+ * ExprNodeDynamicEvaluator.
+ *
+ */
+public class ExprNodeDynamicValueEvaluator extends ExprNodeEvaluator<ExprNodeDynamicValueDesc> {
+
+ transient ObjectInspector oi;
+
+ public ExprNodeDynamicValueEvaluator(ExprNodeDynamicValueDesc expr, Configuration conf) {
+ super(expr, conf);
+ oi = ObjectInspectorUtils.getStandardObjectInspector(expr.getWritableObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+ }
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException {
+ return oi;
+ }
+
+ @Override
+ protected Object _evaluate(Object row, int version) throws HiveException {
+ DynamicValue dynamicValue = expr.getDynamicValue();
+ dynamicValue.setConf(conf);
+ return dynamicValue.getWritableValue();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java
index b8d6ab7..375d65f 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -30,9 +31,11 @@ public abstract class ExprNodeEvaluator<T extends ExprNodeDesc> {
protected final T expr;
protected ObjectInspector outputOI;
+ protected Configuration conf;
- public ExprNodeEvaluator(T expr) {
+ public ExprNodeEvaluator(T expr, Configuration conf) {
this.expr = expr;
+ this.conf = conf;
}
/**
@@ -109,4 +112,12 @@ public abstract class ExprNodeEvaluator<T extends ExprNodeDesc> {
public String toString() {
return "ExprNodeEvaluator[" + expr + "]";
}
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
index 0d03d8f..34aec55 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
@@ -21,11 +21,13 @@ package org.apache.hadoop.hive.ql.exec;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -39,9 +41,13 @@ public final class ExprNodeEvaluatorFactory {
}
public static ExprNodeEvaluator get(ExprNodeDesc desc) throws HiveException {
+ return get(desc, null);
+ }
+
+ public static ExprNodeEvaluator get(ExprNodeDesc desc, Configuration conf) throws HiveException {
// Constant node
if (desc instanceof ExprNodeConstantDesc) {
- return new ExprNodeConstantEvaluator((ExprNodeConstantDesc) desc);
+ return new ExprNodeConstantEvaluator((ExprNodeConstantDesc) desc, conf);
}
// Special 'default' constant node
@@ -51,15 +57,19 @@ public final class ExprNodeEvaluatorFactory {
// Column-reference node, e.g. a column in the input row
if (desc instanceof ExprNodeColumnDesc) {
- return new ExprNodeColumnEvaluator((ExprNodeColumnDesc) desc);
+ return new ExprNodeColumnEvaluator((ExprNodeColumnDesc) desc, conf);
}
// Generic Function node, e.g. CASE, an operator or a UDF node
if (desc instanceof ExprNodeGenericFuncDesc) {
- return new ExprNodeGenericFuncEvaluator((ExprNodeGenericFuncDesc) desc);
+ return new ExprNodeGenericFuncEvaluator((ExprNodeGenericFuncDesc) desc, conf);
}
// Field node, e.g. get a.myfield1 from a
if (desc instanceof ExprNodeFieldDesc) {
- return new ExprNodeFieldEvaluator((ExprNodeFieldDesc) desc);
+ return new ExprNodeFieldEvaluator((ExprNodeFieldDesc) desc, conf);
+ }
+ // Dynamic value which will be determined during query runtime
+ if (desc instanceof ExprNodeDynamicValueDesc) {
+ return new ExprNodeDynamicValueEvaluator((ExprNodeDynamicValueDesc) desc, conf);
}
throw new RuntimeException(
"Cannot find ExprNodeEvaluator for the exprNodeDesc = " + desc);
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java
index 42685fb..991bc13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorHead.java
@@ -30,7 +30,7 @@ public class ExprNodeEvaluatorHead extends ExprNodeEvaluator {
private final ExprNodeEvaluator referencing;
public ExprNodeEvaluatorHead(ExprNodeEvaluator referencing) {
- super(referencing.getExpr());
+ super(referencing.getExpr(), referencing.getConf());
this.referencing = referencing;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java
index 0a6b66a..625d486 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorRef.java
@@ -30,7 +30,7 @@ public class ExprNodeEvaluatorRef extends ExprNodeEvaluator {
private final ExprNodeEvaluator referencing;
public ExprNodeEvaluatorRef(ExprNodeEvaluator referencing) {
- super(referencing.getExpr());
+ super(referencing.getExpr(), referencing.getConf());
this.referencing = referencing;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java
index ff32626..1241343 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -43,9 +44,9 @@ public class ExprNodeFieldEvaluator extends ExprNodeEvaluator<ExprNodeFieldDesc>
transient ObjectInspector structFieldObjectInspector;
transient ObjectInspector resultObjectInspector;
- public ExprNodeFieldEvaluator(ExprNodeFieldDesc desc) throws HiveException {
- super(desc);
- leftEvaluator = ExprNodeEvaluatorFactory.get(desc.getDesc());
+ public ExprNodeFieldEvaluator(ExprNodeFieldDesc desc, Configuration conf) throws HiveException {
+ super(desc, conf);
+ leftEvaluator = ExprNodeEvaluatorFactory.get(desc.getDesc(), conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
index 221abd9..8b9baa6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -91,13 +92,13 @@ public class ExprNodeGenericFuncEvaluator extends ExprNodeEvaluator<ExprNodeGene
}
}
- public ExprNodeGenericFuncEvaluator(ExprNodeGenericFuncDesc expr) throws HiveException {
- super(expr);
+ public ExprNodeGenericFuncEvaluator(ExprNodeGenericFuncDesc expr, Configuration conf) throws HiveException {
+ super(expr, conf);
children = new ExprNodeEvaluator[expr.getChildren().size()];
isEager = false;
for (int i = 0; i < children.length; i++) {
ExprNodeDesc child = expr.getChildren().get(i);
- ExprNodeEvaluator nodeEvaluator = ExprNodeEvaluatorFactory.get(child);
+ ExprNodeEvaluator nodeEvaluator = ExprNodeEvaluatorFactory.get(child, conf);
children[i] = nodeEvaluator;
// If we have eager evaluators anywhere below us, then we are eager too.
if (nodeEvaluator instanceof ExprNodeGenericFuncEvaluator) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index bd0d28c..df30ab2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -60,7 +60,7 @@ public class FilterOperator extends Operator<FilterDesc> implements
try {
heartbeatInterval = HiveConf.getIntVar(hconf,
HiveConf.ConfVars.HIVESENDHEARTBEAT);
- conditionEvaluator = ExprNodeEvaluatorFactory.get(conf.getPredicate());
+ conditionEvaluator = ExprNodeEvaluatorFactory.get(conf.getPredicate(), hconf);
if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) {
conditionEvaluator = ExprNodeEvaluatorFactory.toCachedEval(conditionEvaluator);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 4fce1ac..e166eee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -370,6 +370,7 @@ public final class FunctionRegistry {
system.registerGenericUDF("not", GenericUDFOPNot.class);
system.registerGenericUDF("!", GenericUDFOPNot.class);
system.registerGenericUDF("between", GenericUDFBetween.class);
+ system.registerGenericUDF("in_bloom_filter", GenericUDFInBloomFilter.class);
system.registerGenericUDF("ewah_bitmap_and", GenericUDFEWAHBitmapAnd.class);
system.registerGenericUDF("ewah_bitmap_or", GenericUDFEWAHBitmapOr.class);
@@ -427,7 +428,7 @@ public final class FunctionRegistry {
system.registerGenericUDAF("ewah_bitmap", new GenericUDAFEWAHBitmap());
system.registerGenericUDAF("compute_stats", new GenericUDAFComputeStats());
-
+ system.registerGenericUDAF("bloom_filter", new GenericUDAFBloomFilter());
system.registerUDAF("percentile", UDAFPercentile.class);
@@ -472,7 +473,6 @@ public final class FunctionRegistry {
system.registerGenericUDF("to_unix_timestamp", GenericUDFToUnixTimeStamp.class);
system.registerGenericUDF("internal_interval", GenericUDFInternalInterval.class);
-
// Generic UDTF's
system.registerGenericUDTF("explode", GenericUDTFExplode.class);
system.registerGenericUDTF("replicate_rows", GenericUDTFReplicateRows.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 073147f..be561ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -212,7 +212,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
keyObjectInspectors = new ObjectInspector[numKeys];
currentKeyObjectInspectors = new ObjectInspector[numKeys];
for (int i = 0; i < numKeys; i++) {
- keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i));
+ keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i), hconf);
keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
currentKeyObjectInspectors[i] = ObjectInspectorUtils
.getStandardObjectInspector(keyObjectInspectors[i],
@@ -258,7 +258,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
new ExprNodeColumnDesc(TypeInfoUtils.getTypeInfoFromObjectInspector(
sf.getFieldObjectInspector()),
keyField.getFieldName() + "." + sf.getFieldName(), null,
- false));
+ false), hconf);
unionExprEval.initialize(rowInspector);
}
}
@@ -283,7 +283,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
aggregationParameterObjects[i] = new Object[parameters.size()];
for (int j = 0; j < parameters.size(); j++) {
aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory
- .get(parameters.get(j));
+ .get(parameters.get(j), hconf);
aggregationParameterObjectInspectors[i][j] = aggregationParameterFields[i][j]
.initialize(rowInspector);
if (unionExprEval != null) {
@@ -352,6 +352,21 @@ public class GroupByOperator extends Operator<GroupByDesc> {
}
}
+ // grouping id should be pruned, which is the last of key columns
+ // see ColumnPrunerGroupByProc
+ outputKeyLength = conf.pruneGroupingSetId() ? keyFields.length - 1 : keyFields.length;
+
+ // init objectInspectors
+ ObjectInspector[] objectInspectors =
+ new ObjectInspector[outputKeyLength + aggregationEvaluators.length];
+ for (int i = 0; i < outputKeyLength; i++) {
+ objectInspectors[i] = currentKeyObjectInspectors[i];
+ }
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ objectInspectors[outputKeyLength + i] = aggregationEvaluators[i].init(conf.getAggregators()
+ .get(i).getMode(), aggregationParameterObjectInspectors[i]);
+ }
+
aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
if ((conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) &&
(!groupingSetsPresent)) {
@@ -374,21 +389,6 @@ public class GroupByOperator extends Operator<GroupByDesc> {
List<String> fieldNames = new ArrayList<String>(conf.getOutputColumnNames());
- // grouping id should be pruned, which is the last of key columns
- // see ColumnPrunerGroupByProc
- outputKeyLength = conf.pruneGroupingSetId() ? keyFields.length - 1 : keyFields.length;
-
- // init objectInspectors
- ObjectInspector[] objectInspectors =
- new ObjectInspector[outputKeyLength + aggregationEvaluators.length];
- for (int i = 0; i < outputKeyLength; i++) {
- objectInspectors[i] = currentKeyObjectInspectors[i];
- }
- for (int i = 0; i < aggregationEvaluators.length; i++) {
- objectInspectors[outputKeyLength + i] = aggregationEvaluators[i].init(conf.getAggregators()
- .get(i).getMode(), aggregationParameterObjectInspectors[i]);
- }
-
outputObjInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(fieldNames, Arrays.asList(objectInspectors));
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index ac5331e..3a366f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -143,19 +143,19 @@ public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> i
// process join keys
joinKeys = new List[tagLen];
- JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias);
+ JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias, hconf);
joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
inputObjInspectors, posBigTableAlias, tagLen);
// process join values
joinValues = new List[tagLen];
- JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias);
+ JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias, hconf);
joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
inputObjInspectors, posBigTableAlias, tagLen);
// process join filters
joinFilters = new List[tagLen];
- JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), posBigTableAlias);
+ JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), posBigTableAlias, hconf);
joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
inputObjInspectors, posBigTableAlias, tagLen);
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
index 9718c48..07a3dc6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
@@ -121,14 +121,14 @@ public class JoinUtil {
}
public static int populateJoinKeyValue(List<ExprNodeEvaluator>[] outMap,
- Map<Byte, List<ExprNodeDesc>> inputMap, int posBigTableAlias) throws HiveException {
- return populateJoinKeyValue(outMap, inputMap, null, posBigTableAlias);
+ Map<Byte, List<ExprNodeDesc>> inputMap, int posBigTableAlias, Configuration conf) throws HiveException {
+ return populateJoinKeyValue(outMap, inputMap, null, posBigTableAlias, conf);
}
public static int populateJoinKeyValue(List<ExprNodeEvaluator>[] outMap,
Map<Byte, List<ExprNodeDesc>> inputMap,
Byte[] order,
- int posBigTableAlias) throws HiveException {
+ int posBigTableAlias, Configuration conf) throws HiveException {
int total = 0;
for (Entry<Byte, List<ExprNodeDesc>> e : inputMap.entrySet()) {
if (e.getValue() == null) {
@@ -140,7 +140,7 @@ public class JoinUtil {
if (key == (byte) posBigTableAlias) {
valueFields.add(null);
} else {
- valueFields.add(ExprNodeEvaluatorFactory.get(expr));
+ valueFields.add(ExprNodeEvaluatorFactory.get(expr, conf));
}
}
outMap[key] = valueFields;
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
index 440e0a1..b931c95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
@@ -44,6 +44,16 @@ public interface ObjectCache {
public <T> T retrieve(String key, Callable<T> fn) throws HiveException;
/**
+ * Retrieve object from cache.
+ *
+ * @param <T>
+ * @param key
+ * function to generate the object if it's not there
+ * @return the last cached object with the key, null if none.
+ */
+ public <T> T retrieve(String key) throws HiveException;
+
+ /**
* Retrieve object from cache asynchronously.
*
* @param <T>
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
index 9768efa..71bcd98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
@@ -36,6 +36,11 @@ public class ObjectCacheWrapper implements ObjectCache {
}
@Override
+ public <T> T retrieve(String key) throws HiveException {
+ return globalCache.retrieve(makeKey(key));
+ }
+
+ @Override
public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
return globalCache.retrieve(makeKey(key), fn);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
index 9049ddd..a30c771 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
@@ -63,7 +63,7 @@ public class SelectOperator extends Operator<SelectDesc> implements Serializable
eval = new ExprNodeEvaluator[colList.size()];
for (int i = 0; i < colList.size(); i++) {
assert (colList.get(i) != null);
- eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i));
+ eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i), hconf);
}
if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) {
eval = ExprNodeEvaluatorFactory.toCachedEvals(eval);
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
index 008f8a4..cfe1750 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
@@ -47,6 +47,11 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
}
@Override
+ public <T> T retrieve(String key) throws HiveException {
+ return retrieve(key, null);
+ }
+
+ @Override
public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
try {
if (isDebugEnabled) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
new file mode 100644
index 0000000..7bbedf6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.DynamicValueRegistry;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DynamicValueRegistryTez implements DynamicValueRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicValueRegistryTez.class);
+
+ public static class RegistryConfTez extends RegistryConf {
+ public Configuration conf;
+ public BaseWork baseWork;
+ public ProcessorContext processorContext;
+ public Map<String, LogicalInput> inputs;
+
+ public RegistryConfTez(Configuration conf, BaseWork baseWork,
+ ProcessorContext processorContext, Map<String, LogicalInput> inputs) {
+ super();
+ this.conf = conf;
+ this.baseWork = baseWork;
+ this.processorContext = processorContext;
+ this.inputs = inputs;
+ }
+ }
+
+ protected Map<String, Object> values = Collections.synchronizedMap(new HashMap<String, Object>());
+
+ public DynamicValueRegistryTez() {
+ }
+
+ @Override
+ public Object getValue(String key) {
+ if (!values.containsKey(key)) {
+ throw new IllegalStateException("Value does not exist in registry: " + key);
+ }
+ return values.get(key);
+ }
+
+ protected void setValue(String key, Object value) {
+ values.put(key, value);
+ }
+
+ @Override
+ public void init(RegistryConf conf) throws Exception {
+ RegistryConfTez rct = (RegistryConfTez) conf;
+
+ for (String inputSourceName : rct.baseWork.getInputSourceToRuntimeValuesInfo().keySet()) {
+ LOG.info("Runtime value source: " + inputSourceName);
+
+ LogicalInput runtimeValueInput = rct.inputs.get(inputSourceName);
+ RuntimeValuesInfo runtimeValuesInfo = rct.baseWork.getInputSourceToRuntimeValuesInfo().get(inputSourceName);
+
+ // Setup deserializer/obj inspectors for the incoming data source
+ Deserializer deserializer = ReflectionUtils.newInstance(runtimeValuesInfo.getTableDesc().getDeserializerClass(), null);
+ deserializer.initialize(rct.conf, runtimeValuesInfo.getTableDesc().getProperties());
+ ObjectInspector inspector = deserializer.getObjectInspector();
+
+ // Set up col expressions for the dynamic values using this input
+ List<ExprNodeEvaluator> colExprEvaluators = new ArrayList<ExprNodeEvaluator>();
+ for (ExprNodeDesc expr : runtimeValuesInfo.getColExprs()) {
+ ExprNodeEvaluator exprEval = ExprNodeEvaluatorFactory.get(expr, null);
+ exprEval.initialize(inspector);
+ colExprEvaluators.add(exprEval);
+ }
+
+ runtimeValueInput.start();
+ List<Input> inputList = new ArrayList<Input>();
+ inputList.add(runtimeValueInput);
+ rct.processorContext.waitForAllInputsReady(inputList);
+
+ KeyValueReader kvReader = (KeyValueReader) runtimeValueInput.getReader();
+ long rowCount = 0;
+ while (kvReader.next()) {
+ Object row = deserializer.deserialize((Writable) kvReader.getCurrentValue());
+ rowCount++;
+ for (int colIdx = 0; colIdx < colExprEvaluators.size(); ++colIdx) {
+ // Read each expression and save it to the value registry
+ ExprNodeEvaluator eval = colExprEvaluators.get(colIdx);
+ Object val = eval.evaluate(row);
+ setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), val);
+ }
+ }
+ // For now, expecting a single row (min/max, aggregated bloom filter)
+ if (rowCount != 1) {
+ throw new IllegalStateException("Expected 1 row from " + inputSourceName + ", got " + rowCount);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
index 0141230..1ce8ee9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
@@ -60,6 +60,24 @@ public class LlapObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCac
@SuppressWarnings("unchecked")
@Override
+ public <T> T retrieve(String key) throws HiveException {
+
+ T value = null;
+
+ lock.lock();
+ try {
+ value = (T) registry.getIfPresent(key);
+ if (value != null && isLogDebugEnabled) {
+ LOG.debug("Found " + key + " in cache");
+ }
+ return value;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
T value = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 955fa80..790c9d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -51,11 +51,13 @@ import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.DynamicValueRegistryTez.RegistryConfTez;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -88,8 +90,8 @@ public class MapRecordProcessor extends RecordProcessor {
private final ExecMapperContext execContext;
private MapWork mapWork;
List<MapWork> mergeWorkList;
- List<String> cacheKeys;
- ObjectCache cache;
+ List<String> cacheKeys, dynamicValueCacheKeys;
+ ObjectCache cache, dynamicValueCache;
private int nRows;
public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
@@ -99,9 +101,11 @@ public class MapRecordProcessor extends RecordProcessor {
setLlapOfFragmentId(context);
}
cache = ObjectCacheFactory.getCache(jconf, queryId, true);
+ dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false);
execContext = new ExecMapperContext(jconf);
execContext.setJc(jconf);
cacheKeys = new ArrayList<String>();
+ dynamicValueCacheKeys = new ArrayList<String>();
nRows = 0;
}
@@ -295,6 +299,21 @@ public class MapRecordProcessor extends RecordProcessor {
mapOp.initializeLocalWork(jconf);
+ // Setup values registry
+ checkAbortCondition();
+ String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY;
+ // On LLAP dynamic value registry might already be cached.
+ final DynamicValueRegistryTez registryTez = dynamicValueCache.retrieve(valueRegistryKey,
+ new Callable<DynamicValueRegistryTez>() {
+ @Override
+ public DynamicValueRegistryTez call() {
+ return new DynamicValueRegistryTez();
+ }
+ });
+ dynamicValueCacheKeys.add(valueRegistryKey);
+ RegistryConfTez registryConf = new RegistryConfTez(jconf, mapWork, processorContext, inputs);
+ registryTez.init(registryConf);
+
checkAbortCondition();
initializeMapRecordSources();
mapOp.initializeMapOperator(jconf);
@@ -435,6 +454,12 @@ public class MapRecordProcessor extends RecordProcessor {
}
}
+ if (dynamicValueCache != null && dynamicValueCacheKeys != null) {
+ for (String k: dynamicValueCacheKeys) {
+ dynamicValueCache.release(k);
+ }
+ }
+
// detecting failed executions by exceptions thrown by the operator tree
try {
if (mapOp == null || mapWork == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
index 06dca00..72dcdd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
@@ -65,6 +65,22 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
LOG.info("Releasing key: " + key);
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T retrieve(String key) throws HiveException {
+ T value = null;
+ try {
+ value = (T) registry.get(key);
+ if ( value != null) {
+ LOG.info("Found " + key + " in cache with value: " + value);
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ return value;
+ }
+
@SuppressWarnings("unchecked")
@Override
public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index d80f201..2d06545 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -40,9 +40,11 @@ import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
+import org.apache.hadoop.hive.ql.exec.tez.DynamicValueRegistryTez.RegistryConfTez;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -64,14 +66,14 @@ public class ReduceRecordProcessor extends RecordProcessor{
private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
- private ObjectCache cache;
+ private ObjectCache cache, dynamicValueCache;
public static final Logger l4j = LoggerFactory.getLogger(ReduceRecordProcessor.class);
private ReduceWork reduceWork;
List<BaseWork> mergeWorkList = null;
- List<String> cacheKeys;
+ List<String> cacheKeys, dynamicValueCacheKeys;
private final Map<Integer, DummyStoreOperator> connectOps =
new TreeMap<Integer, DummyStoreOperator>();
@@ -91,9 +93,11 @@ public class ReduceRecordProcessor extends RecordProcessor{
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
cache = ObjectCacheFactory.getCache(jconf, queryId, true);
+ dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false);
String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
cacheKeys = Lists.newArrayList(cacheKey);
+ dynamicValueCacheKeys = new ArrayList<String>();
reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
@Override
public Object call() {
@@ -169,6 +173,21 @@ public class ReduceRecordProcessor extends RecordProcessor{
l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask));
}
OperatorUtils.setMemoryAvailable(reducer.getChildOperators(), memoryAvailableToTask);
+
+ // Setup values registry
+ String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY;
+ DynamicValueRegistryTez registryTez = dynamicValueCache.retrieve(valueRegistryKey,
+ new Callable<DynamicValueRegistryTez>() {
+ @Override
+ public DynamicValueRegistryTez call() {
+ return new DynamicValueRegistryTez();
+ }
+ });
+ dynamicValueCacheKeys.add(valueRegistryKey);
+ RegistryConfTez registryConf = new RegistryConfTez(jconf, reduceWork, processorContext, inputs);
+ registryTez.init(registryConf);
+ checkAbortCondition();
+
if (numTags > 1) {
sources = new ReduceRecordSource[numTags];
mainWorkOIs = new ObjectInspector[numTags];
@@ -348,6 +367,12 @@ public class ReduceRecordProcessor extends RecordProcessor{
}
}
+ if (dynamicValueCache != null && dynamicValueCacheKeys != null) {
+ for (String k: dynamicValueCacheKeys) {
+ dynamicValueCache.release(k);
+ }
+ }
+
try {
for (ReduceRecordSource rs: sources) {
abort = abort && rs.close();
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 0cb6c8a..848fc8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -153,7 +153,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
VectorExpression vectorExpr = bigTableValueExpressions[i];
// This is a vectorized aware evaluator
- ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) {
+ ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc, hconf) {
int columnIndex;
int writerIndex;
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index 80b0a14..ac3363e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -207,7 +207,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
VectorExpression vectorExpr = bigTableValueExpressions[i];
// This is a vectorized aware evaluator
- ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) {
+ ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc, hconf) {
int columnIndex;;
int writerIndex;
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
index 9d900e4..997334b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java
@@ -27,9 +27,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.sarg.LiteralDelegate;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
@@ -58,14 +60,16 @@ import com.esotericsoftware.kryo.io.Input;
public class ConvertAstToSearchArg {
private static final Logger LOG = LoggerFactory.getLogger(ConvertAstToSearchArg.class);
- private final SearchArgument.Builder builder =
- SearchArgumentFactory.newBuilder();
+ private final SearchArgument.Builder builder;
+ private final Configuration conf;
/**
* Builds the expression and leaf list from the original predicate.
* @param expression the expression to translate.
*/
- ConvertAstToSearchArg(ExprNodeGenericFuncDesc expression) {
+ ConvertAstToSearchArg(Configuration conf, ExprNodeGenericFuncDesc expression) {
+ this.conf = conf;
+ builder = SearchArgumentFactory.newBuilder(conf);
parse(expression);
}
@@ -182,7 +186,7 @@ public class ConvertAstToSearchArg {
* @param type the type of the expression
* @return the literal boxed if found or null
*/
- private static Object findLiteral(ExprNodeGenericFuncDesc expr,
+ private static Object findLiteral(Configuration conf, ExprNodeGenericFuncDesc expr,
PredicateLeaf.Type type) {
List<ExprNodeDesc> children = expr.getChildren();
if (children.size() != 2) {
@@ -190,16 +194,29 @@ public class ConvertAstToSearchArg {
}
Object result = null;
for(ExprNodeDesc child: children) {
- if (child instanceof ExprNodeConstantDesc) {
+ Object currentResult = getLiteral(conf, child, type);
+ if (currentResult != null) {
+ // Both children in the expression should not be literal
if (result != null) {
return null;
}
- result = boxLiteral((ExprNodeConstantDesc) child, type);
+ result = currentResult;
}
}
return result;
}
+ private static Object getLiteral(Configuration conf, ExprNodeDesc child, PredicateLeaf.Type type) {
+ if (child instanceof ExprNodeConstantDesc) {
+ return boxLiteral((ExprNodeConstantDesc) child, type);
+ } else if (child instanceof ExprNodeDynamicValueDesc) {
+ LiteralDelegate value = ((ExprNodeDynamicValueDesc) child).getDynamicValue();
+ value.setConf(conf);
+ return value;
+ }
+ return null;
+ }
+
/**
* Return the boxed literal at the given position
* @param expr the parent node
@@ -207,15 +224,12 @@ public class ConvertAstToSearchArg {
* @param position the child position to check
* @return the boxed literal if found otherwise null
*/
- private static Object getLiteral(ExprNodeGenericFuncDesc expr,
+ private static Object getLiteral(Configuration conf, ExprNodeGenericFuncDesc expr,
PredicateLeaf.Type type,
int position) {
List<ExprNodeDesc> children = expr.getChildren();
- Object child = children.get(position);
- if (child instanceof ExprNodeConstantDesc) {
- return boxLiteral((ExprNodeConstantDesc) child, type);
- }
- return null;
+ ExprNodeDesc child = children.get(position);
+ return getLiteral(conf, child, type);
}
private static Object[] getLiteralList(ExprNodeGenericFuncDesc expr,
@@ -272,16 +286,16 @@ public class ConvertAstToSearchArg {
builder.isNull(columnName, type);
break;
case EQUALS:
- builder.equals(columnName, type, findLiteral(expression, type));
+ builder.equals(columnName, type, findLiteral(conf, expression, type));
break;
case NULL_SAFE_EQUALS:
- builder.nullSafeEquals(columnName, type, findLiteral(expression, type));
+ builder.nullSafeEquals(columnName, type, findLiteral(conf, expression, type));
break;
case LESS_THAN:
- builder.lessThan(columnName, type, findLiteral(expression, type));
+ builder.lessThan(columnName, type, findLiteral(conf, expression, type));
break;
case LESS_THAN_EQUALS:
- builder.lessThanEquals(columnName, type, findLiteral(expression, type));
+ builder.lessThanEquals(columnName, type, findLiteral(conf, expression, type));
break;
case IN:
builder.in(columnName, type,
@@ -289,8 +303,8 @@ public class ConvertAstToSearchArg {
break;
case BETWEEN:
builder.between(columnName, type,
- getLiteral(expression, type, variable + 1),
- getLiteral(expression, type, variable + 2));
+ getLiteral(conf, expression, type, variable + 1),
+ getLiteral(conf, expression, type, variable + 2));
break;
}
} catch (Exception e) {
@@ -425,8 +439,8 @@ public class ConvertAstToSearchArg {
public static final String SARG_PUSHDOWN = "sarg.pushdown";
- public static SearchArgument create(ExprNodeGenericFuncDesc expression) {
- return new ConvertAstToSearchArg(expression).buildSearchArgument();
+ public static SearchArgument create(Configuration conf, ExprNodeGenericFuncDesc expression) {
+ return new ConvertAstToSearchArg(conf, expression).buildSearchArgument();
}
@@ -445,7 +459,7 @@ public class ConvertAstToSearchArg {
public static SearchArgument createFromConf(Configuration conf) {
String sargString;
if ((sargString = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR)) != null) {
- return create(SerializationUtilities.deserializeExpression(sargString));
+ return create(conf, SerializationUtilities.deserializeExpression(sargString));
} else if ((sargString = conf.get(SARG_PUSHDOWN)) != null) {
return create(sargString);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index 26fcc45..c8691e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -28,13 +28,8 @@ import java.util.Stack;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.GroupByOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -50,20 +45,19 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
-import org.apache.hadoop.hive.ql.plan.AggregationDesc;
-import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,15 +142,13 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
FilterOperator filter = (FilterOperator) nd;
FilterDesc desc = filter.getConf();
- TableScanOperator ts = null;
-
if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING) &&
!parseContext.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
// nothing to do when the optimization is off
return null;
}
- DynamicPartitionPrunerContext removerContext = new DynamicPartitionPrunerContext();
+ TableScanOperator ts = null;
if (filter.getParentOperators().size() == 1
&& filter.getParentOperators().get(0) instanceof TableScanOperator) {
@@ -169,14 +161,32 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
LOG.debug("TableScan: " + ts);
}
+ DynamicPartitionPrunerContext removerContext = new DynamicPartitionPrunerContext();
+
// collect the dynamic pruning conditions
removerContext.dynLists.clear();
collectDynamicPruningConditions(desc.getPredicate(), removerContext);
+ if (ts == null) {
+ // Replace the synthetic predicate with true and bail out
+ for (DynamicListContext ctx : removerContext) {
+ ExprNodeDesc constNode =
+ new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true);
+ replaceExprNode(ctx, desc, constNode);
+ }
+ return false;
+ }
+
+ final boolean semiJoin = parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION);
+
for (DynamicListContext ctx : removerContext) {
String column = ExprNodeDescUtils.extractColName(ctx.parent);
+ boolean semiJoinAttempted = false;
+
+ if (column != null) {
+ // Need unique IDs to refer to each min/max key value in the DynamicValueRegistry
+ String keyBaseAlias = "";
- if (ts != null && column != null) {
Table table = ts.getConf().getTableMetadata();
if (table != null && table.isPartitionKey(column)) {
@@ -203,20 +213,56 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
}
} else {
LOG.debug("Column " + column + " is not a partition column");
+ if (semiJoin && ts.getConf().getFilterExpr() != null) {
+ LOG.debug("Initiate semijoin reduction for " + column);
+ // Get the table name from which the min-max values will come.
+ Operator<?> op = ctx.generator;
+ while (!(op == null || op instanceof TableScanOperator)) {
+ op = op.getParentOperators().get(0);
+ }
+ String tableAlias = (op == null ? "" : ((TableScanOperator) op).getConf().getAlias());
+ keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias + "_" + column;
+
+ semiJoinAttempted = generateSemiJoinOperatorPlan(ctx, parseContext, ts, keyBaseAlias);
+ }
}
- }
- // we always remove the condition by replacing it with "true"
- ExprNodeDesc constNode = new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true);
- if (ctx.grandParent == null) {
- desc.setPredicate(constNode);
+ // If semijoin is attempted then replace the condition with a min-max filter
+ // and bloom filter else,
+ // we always remove the condition by replacing it with "true"
+ if (semiJoinAttempted) {
+ List<ExprNodeDesc> betweenArgs = new ArrayList<ExprNodeDesc>();
+ betweenArgs.add(new ExprNodeConstantDesc(Boolean.FALSE)); // Do not invert between result
+ // add column expression here
+ betweenArgs.add(ctx.parent.getChildren().get(0));
+ betweenArgs.add(new ExprNodeDynamicValueDesc(new DynamicValue(keyBaseAlias + "_min", ctx.desc.getTypeInfo())));
+ betweenArgs.add(new ExprNodeDynamicValueDesc(new DynamicValue(keyBaseAlias + "_max", ctx.desc.getTypeInfo())));
+ ExprNodeDesc betweenNode = ExprNodeGenericFuncDesc.newInstance(
+ FunctionRegistry.getFunctionInfo("between").getGenericUDF(), betweenArgs);
+ replaceExprNode(ctx, desc, betweenNode);
+ // add column expression for bloom filter
+ List<ExprNodeDesc> bloomFilterArgs = new ArrayList<ExprNodeDesc>();
+ bloomFilterArgs.add(ctx.parent.getChildren().get(0));
+ bloomFilterArgs.add(new ExprNodeDynamicValueDesc(
+ new DynamicValue(keyBaseAlias + "_bloom_filter",
+ TypeInfoFactory.binaryTypeInfo)));
+ ExprNodeDesc bloomFilterNode = ExprNodeGenericFuncDesc.newInstance(
+ FunctionRegistry.getFunctionInfo("in_bloom_filter").
+ getGenericUDF(), bloomFilterArgs);
+ // ctx may not have the grandparent but it is set in filterDesc by now.
+ ExprNodeDesc grandParent = ctx.grandParent == null ?
+ desc.getPredicate() : ctx.grandParent;
+ grandParent.getChildren().add(bloomFilterNode);
+ } else {
+ ExprNodeDesc replaceNode = new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true);
+ replaceExprNode(ctx, desc, replaceNode);
+ }
} else {
- int i = ctx.grandParent.getChildren().indexOf(ctx.parent);
- ctx.grandParent.getChildren().remove(i);
- ctx.grandParent.getChildren().add(i, constNode);
+ ExprNodeDesc constNode =
+ new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true);
+ replaceExprNode(ctx, desc, constNode);
}
}
-
// if we pushed the predicate into the table scan we need to remove the
// synthetic conditions there.
cleanTableScanFilters(ts);
@@ -224,6 +270,16 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
return false;
}
+ private void replaceExprNode(DynamicListContext ctx, FilterDesc desc, ExprNodeDesc node) {
+ if (ctx.grandParent == null) {
+ desc.setPredicate(node);
+ } else {
+ int i = ctx.grandParent.getChildren().indexOf(ctx.parent);
+ ctx.grandParent.getChildren().remove(i);
+ ctx.grandParent.getChildren().add(i, node);
+ }
+ }
+
private void cleanTableScanFilters(TableScanOperator ts) throws SemanticException {
if (ts == null || ts.getConf() == null || ts.getConf().getFilterExpr() == null) {
@@ -327,6 +383,228 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
}
}
+ // Generates plan for min/max when dynamic partition pruning is ruled out.
+ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContext parseContext,
+ TableScanOperator ts, String keyBaseAlias) throws SemanticException {
+
+ // we will put a fork in the plan at the source of the reduce sink
+ Operator<? extends OperatorDesc> parentOfRS = ctx.generator.getParentOperators().get(0);
+
+ // we need the expr that generated the key of the reduce sink
+ ExprNodeDesc key = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex());
+
+ if (parentOfRS instanceof SelectOperator) {
+ // Make sure the semijoin branch is not on parition column.
+ String internalColName = null;
+ ExprNodeDesc exprNodeDesc = key;
+ // Find the ExprNodeColumnDesc
+ while (!(exprNodeDesc instanceof ExprNodeColumnDesc)) {
+ exprNodeDesc = exprNodeDesc.getChildren().get(0);
+ }
+ internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn();
+
+ ExprNodeColumnDesc colExpr = ((ExprNodeColumnDesc)(parentOfRS.
+ getColumnExprMap().get(internalColName)));
+ String colName = ExprNodeDescUtils.extractColName(colExpr);
+
+ // Fetch the TableScan Operator.
+ Operator<?> op = parentOfRS.getParentOperators().get(0);
+ while (op != null && !(op instanceof TableScanOperator)) {
+ op = op.getParentOperators().get(0);
+ }
+ assert op != null;
+
+ Table table = ((TableScanOperator) op).getConf().getTableMetadata();
+ if (table.isPartitionKey(colName)) {
+ // The column is partition column, skip the optimization.
+ return false;
+ }
+ }
+ List<ExprNodeDesc> keyExprs = new ArrayList<ExprNodeDesc>();
+ keyExprs.add(key);
+
+ // group by requires "ArrayList", don't ask.
+ ArrayList<String> outputNames = new ArrayList<String>();
+ outputNames.add(HiveConf.getColumnInternalName(0));
+
+ // project the relevant key column
+ SelectDesc select = new SelectDesc(keyExprs, outputNames);
+ SelectOperator selectOp =
+ (SelectOperator) OperatorFactory.getAndMakeChild(select,
+ new RowSchema(parentOfRS.getSchema()), parentOfRS);
+
+ // do a group by to aggregate min,max and bloom filter.
+ float groupByMemoryUsage =
+ HiveConf.getFloatVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+ float memoryThreshold =
+ HiveConf.getFloatVar(parseContext.getConf(),
+ HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+
+ ArrayList<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
+
+ // Add min/max and bloom filter aggregations
+ List<ObjectInspector> aggFnOIs = new ArrayList<ObjectInspector>();
+ aggFnOIs.add(key.getWritableObjectInspector());
+ ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
+ params.add(
+ new ExprNodeColumnDesc(key.getTypeInfo(), outputNames.get(0),
+ "", false));
+
+ ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
+ try {
+ AggregationDesc min = new AggregationDesc("min",
+ FunctionRegistry.getGenericUDAFEvaluator("min", aggFnOIs, false, false),
+ params, false, Mode.PARTIAL1);
+ AggregationDesc max = new AggregationDesc("max",
+ FunctionRegistry.getGenericUDAFEvaluator("max", aggFnOIs, false, false),
+ params, false, Mode.PARTIAL1);
+ AggregationDesc bloomFilter = new AggregationDesc("bloom_filter",
+ FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", aggFnOIs, false, false),
+ params, false, Mode.PARTIAL1);
+ GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
+ bloomFilterEval.setSourceOperator(selectOp);
+ bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+ bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval);
+ aggs.add(min);
+ aggs.add(max);
+ aggs.add(bloomFilter);
+ } catch (SemanticException e) {
+ LOG.error("Error creating min/max aggregations on key", e);
+ throw new IllegalStateException("Error creating min/max aggregations on key", e);
+ }
+
+ // Create the Group by Operator
+ ArrayList<String> gbOutputNames = new ArrayList<String>();
+ gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0));
+ gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1));
+ gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2));
+ GroupByDesc groupBy = new GroupByDesc(GroupByDesc.Mode.HASH,
+ gbOutputNames, new ArrayList<ExprNodeDesc>(), aggs, false,
+ groupByMemoryUsage, memoryThreshold, null, false, 0, false);
+
+ ArrayList<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
+ groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(0), key.getTypeInfo(), "", false));
+ groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(1), key.getTypeInfo(), "", false));
+ groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(2), key.getTypeInfo(), "", false));
+
+ GroupByOperator groupByOp = (GroupByOperator)OperatorFactory.getAndMakeChild(
+ groupBy, new RowSchema(groupbyColInfos), selectOp);
+
+ groupByOp.setColumnExprMap(new HashMap<String, ExprNodeDesc>());
+
+ // Get the column names of the aggregations for reduce sink
+ int colPos = 0;
+ ArrayList<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < aggs.size() - 1; i++) {
+ ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(key.getTypeInfo(),
+ gbOutputNames.get(colPos++), "", false);
+ rsValueCols.add(colExpr);
+ }
+
+ // Bloom Filter uses binary
+ ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo,
+ gbOutputNames.get(colPos++), "", false);
+ rsValueCols.add(colExpr);
+
+ // Create the reduce sink operator
+ ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(
+ new ArrayList<ExprNodeDesc>(), rsValueCols, gbOutputNames, false,
+ -1, 0, 1, Operation.NOT_ACID);
+ ReduceSinkOperator rsOp = (ReduceSinkOperator)OperatorFactory.getAndMakeChild(
+ rsDesc, new RowSchema(groupByOp.getSchema()), groupByOp);
+ Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
+ rsOp.setColumnExprMap(columnExprMap);
+
+ // Create the final Group By Operator
+ ArrayList<AggregationDesc> aggsFinal = new ArrayList<AggregationDesc>();
+ try {
+ List<ObjectInspector> minFinalFnOIs = new ArrayList<ObjectInspector>();
+ List<ObjectInspector> maxFinalFnOIs = new ArrayList<ObjectInspector>();
+ List<ObjectInspector> bloomFilterFinalFnOIs = new ArrayList<ObjectInspector>();
+ ArrayList<ExprNodeDesc> minFinalParams = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> maxFinalParams = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> bloomFilterFinalParams = new ArrayList<ExprNodeDesc>();
+ // Use the expressions from Reduce Sink.
+ minFinalFnOIs.add(rsValueCols.get(0).getWritableObjectInspector());
+ maxFinalFnOIs.add(rsValueCols.get(1).getWritableObjectInspector());
+ bloomFilterFinalFnOIs.add(rsValueCols.get(2).getWritableObjectInspector());
+ // Coming from a ReduceSink the aggregations would be in the form VALUE._col0, VALUE._col1
+ minFinalParams.add(
+ new ExprNodeColumnDesc(
+ rsValueCols.get(0).getTypeInfo(),
+ Utilities.ReduceField.VALUE + "." +
+ gbOutputNames.get(0), "", false));
+ maxFinalParams.add(
+ new ExprNodeColumnDesc(
+ rsValueCols.get(1).getTypeInfo(),
+ Utilities.ReduceField.VALUE + "." +
+ gbOutputNames.get(1), "", false));
+ bloomFilterFinalParams.add(
+ new ExprNodeColumnDesc(
+ rsValueCols.get(2).getTypeInfo(),
+ Utilities.ReduceField.VALUE + "." +
+ gbOutputNames.get(2), "", false));
+
+ AggregationDesc min = new AggregationDesc("min",
+ FunctionRegistry.getGenericUDAFEvaluator("min", minFinalFnOIs,
+ false, false),
+ minFinalParams, false, Mode.FINAL);
+ AggregationDesc max = new AggregationDesc("max",
+ FunctionRegistry.getGenericUDAFEvaluator("max", maxFinalFnOIs,
+ false, false),
+ maxFinalParams, false, Mode.FINAL);
+ AggregationDesc bloomFilter = new AggregationDesc("bloom_filter",
+ FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", bloomFilterFinalFnOIs,
+ false, false),
+ bloomFilterFinalParams, false, Mode.FINAL);
+ GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
+ bloomFilterEval.setSourceOperator(selectOp);
+ bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+ bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval);
+
+ aggsFinal.add(min);
+ aggsFinal.add(max);
+ aggsFinal.add(bloomFilter);
+ } catch (SemanticException e) {
+ LOG.error("Error creating min/max aggregations on key", e);
+ throw new IllegalStateException("Error creating min/max aggregations on key", e);
+ }
+
+ GroupByDesc groupByDescFinal = new GroupByDesc(GroupByDesc.Mode.FINAL,
+ gbOutputNames, new ArrayList<ExprNodeDesc>(), aggsFinal, false,
+ groupByMemoryUsage, memoryThreshold, null, false, 0, false);
+ GroupByOperator groupByOpFinal = (GroupByOperator)OperatorFactory.getAndMakeChild(
+ groupByDescFinal, new RowSchema(rsOp.getSchema()), rsOp);
+ groupByOpFinal.setColumnExprMap(new HashMap<String, ExprNodeDesc>());
+
+ // Create the final Reduce Sink Operator
+ ReduceSinkDesc rsDescFinal = PlanUtils.getReduceSinkDesc(
+ new ArrayList<ExprNodeDesc>(), rsValueCols, gbOutputNames, false,
+ -1, 0, 1, Operation.NOT_ACID);
+ ReduceSinkOperator rsOpFinal = (ReduceSinkOperator)OperatorFactory.getAndMakeChild(
+ rsDescFinal, new RowSchema(groupByOpFinal.getSchema()), groupByOpFinal);
+ rsOpFinal.setColumnExprMap(columnExprMap);
+
+ LOG.debug("DynamicMinMaxPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts);
+ parseContext.getRsOpToTsOpMap().put(rsOpFinal, ts);
+
+ // Save the info that is required at query time to resolve dynamic/runtime values.
+ RuntimeValuesInfo runtimeValuesInfo = new RuntimeValuesInfo();
+ TableDesc rsFinalTableDesc = PlanUtils.getReduceValueTableDesc(
+ PlanUtils.getFieldSchemasFromColumnList(rsValueCols, "_col"));
+ List<String> dynamicValueIDs = new ArrayList<String>();
+ dynamicValueIDs.add(keyBaseAlias + "_min");
+ dynamicValueIDs.add(keyBaseAlias + "_max");
+ dynamicValueIDs.add(keyBaseAlias + "_bloom_filter");
+
+ runtimeValuesInfo.setTableDesc(rsFinalTableDesc);
+ runtimeValuesInfo.setDynamicValueIDs(dynamicValueIDs);
+ runtimeValuesInfo.setColExprs(rsValueCols);
+ parseContext.getRsToRuntimeValuesInfoMap().put(rsOpFinal, runtimeValuesInfo);
+
+ return true;
+ }
+
private Map<Node, Object> collectDynamicPruningConditions(ExprNodeDesc pred, NodeProcessorCtx ctx)
throws SemanticException {
http://git-wip-us.apache.org/repos/asf/hive/blob/cc3fd84e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
index 9e9beb0..b853a06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
@@ -135,7 +135,7 @@ public class FixedBucketPruningOptimizer extends Transform {
return;
}
// the sargs are closely tied to hive.optimize.index.filter
- SearchArgument sarg = ConvertAstToSearchArg.create(filter);
+ SearchArgument sarg = ConvertAstToSearchArg.create(ctxt.pctx.getConf(), filter);
if (sarg == null) {
return;
}