You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2010/09/18 02:22:43 UTC
svn commit: r998374 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/queries/ hbase-handler/src/test/results/
ql/src/java/org/apache/hadoop/hive/q...
Author: heyongqiang
Date: Sat Sep 18 00:22:42 2010
New Revision: 998374
URL: http://svn.apache.org/viewvc?rev=998374&view=rev
Log:
HIVE-1226. support filter pushdown against non-native tables. (jvs via He Yongqiang)
Added:
hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q
hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sat Sep 18 00:22:42 2010
@@ -132,6 +132,9 @@ Trunk - Unreleased
HIVE-1645. ability to specify parent directory for zookeeper lock manager
(namit via He Yongqiang)
+ HIVE-1226. support filter pushdown against non-native tables
+ (jvs via He Yongqiang)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Sep 18 00:22:42 2010
@@ -262,6 +262,8 @@ public class HiveConf extends Configurat
// Optimizer
HIVEOPTCP("hive.optimize.cp", true), // column pruner
HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
+ // push predicates down to storage handlers
+ HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true),
HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Sat Sep 18 00:22:42 2010
@@ -273,6 +273,12 @@
</property>
<property>
+ <name>hive.optimize.ppd.storage</name>
+ <value>true</value>
+ <description>Whether to push predicates down into storage handlers. Ignored when hive.optimize.ppd is false.</description>
+</property>
+
+<property>
<name>hive.optimize.pruner</name>
<value>true</value>
<description>Whether to enable the new partition pruner which depends on predicate pushdown. If this is disabled,
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Sat Sep 18 00:22:42 2010
@@ -582,4 +582,11 @@ public class HBaseSerDe implements SerDe
public void setUseJSONSerialize(boolean useJSONSerialize) {
this.useJSONSerialize = useJSONSerialize;
}
+
+ /**
+ * @return 0-based offset of the key column within the table
+ */
+ int getKeyColumnOffset() {
+ return iKey;
+ }
}
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Sat Sep 18 00:22:42 2010
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.hbase;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -39,11 +40,17 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Constants;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.util.StringUtils;
@@ -51,7 +58,8 @@ import org.apache.hadoop.util.StringUtil
* HBaseStorageHandler provides a HiveStorageHandler implementation for
* HBase.
*/
-public class HBaseStorageHandler implements HiveStorageHandler, HiveMetaHook {
+public class HBaseStorageHandler extends DefaultStorageHandler
+ implements HiveMetaHook, HiveStoragePredicateHandler {
private HBaseConfiguration hbaseConf;
private HBaseAdmin admin;
@@ -260,4 +268,38 @@ public class HBaseStorageHandler impleme
}
jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
}
+
+ @Override
+ public DecomposedPredicate decomposePredicate(
+ JobConf jobConf,
+ Deserializer deserializer,
+ ExprNodeDesc predicate)
+ {
+ String columnNameProperty = jobConf.get(
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS);
+ List<String> columnNames =
+ Arrays.asList(columnNameProperty.split(","));
+ HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer;
+ IndexPredicateAnalyzer analyzer =
+ HiveHBaseTableInputFormat.newIndexPredicateAnalyzer(
+ columnNames.get(hbaseSerde.getKeyColumnOffset()));
+ List<IndexSearchCondition> searchConditions =
+ new ArrayList<IndexSearchCondition>();
+ ExprNodeDesc residualPredicate =
+ analyzer.analyzePredicate(predicate, searchConditions);
+ if (searchConditions.size() != 1) {
+ // Either there was nothing which could be pushed down (size = 0),
+ // or more than one predicate (size > 1); in the latter case,
+ // we bail out for now since multiple lookups on the key are
+ // either contradictory or redundant. We'll need to handle
+ // this better later when we support more interesting predicates.
+ return null;
+ }
+
+ DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
+ decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(
+ searchConditions);
+ decomposedPredicate.residualPredicate = residualPredicate;
+ return decomposedPredicate;
+ }
}
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Sat Sep 18 00:22:42 2010
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.hbase;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -29,13 +30,31 @@ import org.apache.hadoop.hbase.HBaseConf
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -50,7 +69,7 @@ import org.apache.hadoop.mapreduce.lib.i
/**
* HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
* tables, decorating an underlying HBase TableInputFormat with extra Hive logic
- * such as column pruning.
+ * such as column pruning and filter pushdown.
*/
public class HiveHBaseTableInputFormat extends TableInputFormatBase
implements InputFormat<ImmutableBytesWritable, Result> {
@@ -64,6 +83,7 @@ public class HiveHBaseTableInputFormat e
final Reporter reporter) throws IOException {
HBaseSplit hbaseSplit = (HBaseSplit) split;
+ TableSplit tableSplit = hbaseSplit.getSplit();
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
@@ -105,7 +125,7 @@ public class HiveHBaseTableInputFormat e
}
}
- // The HBase table's row key maps to an Hive table column. In the corner case when only the
+ // The HBase table's row key maps to a Hive table column. In the corner case when only the
// row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/
// column qualifier will have been added to the scan. We arbitrarily add at least one column
// to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
@@ -128,8 +148,11 @@ public class HiveHBaseTableInputFormat e
}
}
+ // If Hive's optimizer gave us a filter to process, convert it to the
+ // HBase scan form now.
+ tableSplit = convertFilter(jobConf, scan, tableSplit, iKey);
+
setScan(scan);
- org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit = hbaseSplit.getSplit();
Job job = new Job(jobConf);
TaskAttemptContext tac =
@@ -200,6 +223,137 @@ public class HiveHBaseTableInputFormat e
};
}
+ /**
+ * Converts a filter (which has been pushed down from Hive's optimizer)
+ * into corresponding restrictions on the HBase scan. The
+ * filter should already be in a form which can be fully converted.
+ *
+ * @param jobConf configuration for the scan
+ *
+ * @param scan the HBase scan object to restrict
+ *
+ * @param tableSplit the HBase table split to restrict, or null
+ * if calculating splits
+ *
+ * @param iKey 0-based offset of key column within Hive table
+ *
+ * @return converted table split if any
+ */
+ private TableSplit convertFilter(
+ JobConf jobConf,
+ Scan scan,
+ TableSplit tableSplit,
+ int iKey)
+ throws IOException {
+
+ String filterExprSerialized =
+ jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ if (filterExprSerialized == null) {
+ return tableSplit;
+ }
+ ExprNodeDesc filterExpr =
+ Utilities.deserializeExpression(filterExprSerialized, jobConf);
+
+ String columnNameProperty = jobConf.get(Constants.LIST_COLUMNS);
+ List<String> columnNames =
+ Arrays.asList(columnNameProperty.split(","));
+
+ IndexPredicateAnalyzer analyzer =
+ newIndexPredicateAnalyzer(columnNames.get(iKey));
+
+ List<IndexSearchCondition> searchConditions =
+ new ArrayList<IndexSearchCondition>();
+ ExprNodeDesc residualPredicate =
+ analyzer.analyzePredicate(filterExpr, searchConditions);
+
+ // There should be no residual since we already negotiated
+ // that earlier in HBaseStorageHandler.decomposePredicate.
+ if (residualPredicate != null) {
+ throw new RuntimeException(
+ "Unexpected residual predicate " + residualPredicate.getExprString());
+ }
+
+ // There should be exactly one predicate since we already
+ // negotiated that also.
+ if (searchConditions.size() != 1) {
+ throw new RuntimeException(
+ "Exactly one search condition expected in push down");
+ }
+
+ // Convert the search condition into a restriction on the HBase scan
+ IndexSearchCondition sc = searchConditions.get(0);
+ ExprNodeConstantEvaluator eval =
+ new ExprNodeConstantEvaluator(sc.getConstantDesc());
+ byte [] startRow;
+ try {
+ ObjectInspector objInspector = eval.initialize(null);
+ Object writable = eval.evaluate(null);
+ ByteStream.Output serializeStream = new ByteStream.Output();
+ LazyUtils.writePrimitiveUTF8(
+ serializeStream,
+ writable,
+ (PrimitiveObjectInspector) objInspector,
+ false,
+ (byte) 0,
+ null);
+ startRow = new byte[serializeStream.getCount()];
+ System.arraycopy(
+ serializeStream.getData(), 0,
+ startRow, 0, serializeStream.getCount());
+ } catch (HiveException ex) {
+ throw new IOException(ex);
+ }
+
+ // stopRow is exclusive, so pad it with a trailing 0 byte to
+ // make it compare as the very next value after startRow
+ byte [] stopRow = new byte[startRow.length + 1];
+ System.arraycopy(startRow, 0, stopRow, 0, startRow.length);
+
+ if (tableSplit != null) {
+ tableSplit = new TableSplit(
+ tableSplit.getTableName(),
+ startRow,
+ stopRow,
+ tableSplit.getRegionLocation());
+ }
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+
+ // Add a WhileMatchFilter to make the scan terminate as soon
+ // as we see a non-matching key. This is probably redundant
+ // since the stopRow above should already take care of it for us.
+ scan.setFilter(
+ new WhileMatchFilter(
+ new RowFilter(
+ CompareFilter.CompareOp.EQUAL,
+ new BinaryComparator(startRow))));
+ return tableSplit;
+ }
+
+ /**
+ * Instantiates a new predicate analyzer suitable for
+ * determining how to push a filter down into the HBase scan,
+ * based on the rules for what kinds of pushdown we currently support.
+ *
+ * @param keyColumnName name of the Hive column mapped to the HBase row key
+ *
+ * @return preconfigured predicate analyzer
+ */
+ static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
+ String keyColumnName) {
+
+ IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+
+ // for now, we only support equality comparisons
+ analyzer.addComparisonOp("=");
+
+ // and only on the key column
+ analyzer.clearAllowedColumnNames();
+ analyzer.allowColumnName(keyColumnName);
+
+ return analyzer;
+ }
+
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
@@ -226,6 +380,14 @@ public class HiveHBaseTableInputFormat e
Scan scan = new Scan();
+ // Take filter pushdown into account while calculating splits; this
+ // allows us to prune off regions immediately. Note that although
+ // the Javadoc for the superclass getSplits says that it returns one
+ // split per region, the implementation actually takes the scan
+ // definition into account and excludes regions which don't satisfy
+ // the start/stop row conditions (HBASE-1829).
+ convertFilter(jobConf, scan, null, iKey);
+
// REVIEW: are we supposed to be applying the getReadColumnIDs
// same as in getRecordReader?
for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
@@ -244,7 +406,9 @@ public class HiveHBaseTableInputFormat e
Job job = new Job(jobConf);
JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());
Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
- List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(jobContext);
+
+ List<org.apache.hadoop.mapreduce.InputSplit> splits =
+ super.getSplits(jobContext);
InputSplit [] results = new InputSplit[splits.size()];
for (int i = 0; i < splits.size(); i++) {
Added: hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q Sat Sep 18 00:22:42 2010
@@ -0,0 +1,53 @@
+CREATE TABLE hbase_pushdown(key int, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string");
+
+INSERT OVERWRITE TABLE hbase_pushdown
+SELECT *
+FROM src;
+
+-- with full pushdown
+explain select * from hbase_pushdown where key=90;
+
+select * from hbase_pushdown where key=90;
+
+-- with partial pushdown
+
+explain select * from hbase_pushdown where key=90 and value like '%90%';
+
+select * from hbase_pushdown where key=90 and value like '%90%';
+
+-- with two residuals
+
+explain select * from hbase_pushdown
+where key=90 and value like '%90%' and key=cast(value as int);
+
+-- with contradictory pushdowns
+
+explain select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%';
+
+select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%';
+
+-- with nothing to push down
+
+explain select * from hbase_pushdown;
+
+-- with a predicate which is not actually part of the filter, so
+-- it should be ignored by pushdown
+
+explain select * from hbase_pushdown
+where (case when key=90 then 2 else 4 end) > 3;
+
+-- with a predicate which is under an OR, so it should
+-- be ignored by pushdown
+
+explain select * from hbase_pushdown
+where key=80 or value like '%90%';
+
+set hive.optimize.ppd.storage=false;
+
+-- with pushdown disabled
+
+explain select * from hbase_pushdown where key=90;
Added: hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out Sat Sep 18 00:22:42 2010
@@ -0,0 +1,427 @@
+PREHOOK: query: CREATE TABLE hbase_pushdown(key int, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string")
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE hbase_pushdown(key int, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@hbase_pushdown
+PREHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown
+SELECT *
+FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbase_pushdown
+POSTHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown
+SELECT *
+FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbase_pushdown
+PREHOOK: query: -- with full pushdown
+explain select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with full pushdown
+explain select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ filterExpr:
+ expr: (key = 90)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (key = 90)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_pushdown
+PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-05-54_049_1244315391309244934/-mr-10000
+POSTHOOK: query: select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_pushdown
+POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-05-54_049_1244315391309244934/-mr-10000
+90 val_90
+PREHOOK: query: -- with partial pushdown
+
+explain select * from hbase_pushdown where key=90 and value like '%90%'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with partial pushdown
+
+explain select * from hbase_pushdown where key=90 and value like '%90%'
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (= (TOK_TABLE_OR_COL key) 90) (like (TOK_TABLE_OR_COL value) '%90%')))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ filterExpr:
+ expr: (key = 90)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (value like '%90%')
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: ((key = 90) and (value like '%90%'))
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select * from hbase_pushdown where key=90 and value like '%90%'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_pushdown
+PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-00_089_9169062048458581014/-mr-10000
+POSTHOOK: query: select * from hbase_pushdown where key=90 and value like '%90%'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_pushdown
+POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-00_089_9169062048458581014/-mr-10000
+90 val_90
+PREHOOK: query: -- with two residuals
+
+explain select * from hbase_pushdown
+where key=90 and value like '%90%' and key=cast(value as int)
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with two residuals
+
+explain select * from hbase_pushdown
+where key=90 and value like '%90%' and key=cast(value as int)
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (and (= (TOK_TABLE_OR_COL key) 90) (like (TOK_TABLE_OR_COL value) '%90%')) (= (TOK_TABLE_OR_COL key) (TOK_FUNCTION TOK_INT (TOK_TABLE_OR_COL value)))))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ filterExpr:
+ expr: (key = 90)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: ((value like '%90%') and (key = UDFToInteger(value)))
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (((key = 90) and (value like '%90%')) and (key = UDFToInteger(value)))
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: -- with contradictory pushdowns
+
+explain select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with contradictory pushdowns
+
+explain select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%'
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (and (= (TOK_TABLE_OR_COL key) 80) (= (TOK_TABLE_OR_COL key) 90)) (like (TOK_TABLE_OR_COL value) '%90%')))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ Filter Operator
+ predicate:
+ expr: (((key = 80) and (key = 90)) and (value like '%90%'))
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (((key = 80) and (key = 90)) and (value like '%90%'))
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_pushdown
+PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-05_982_8346366828445837832/-mr-10000
+POSTHOOK: query: select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_pushdown
+POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-05_982_8346366828445837832/-mr-10000
+PREHOOK: query: -- with nothing to push down
+
+explain select * from hbase_pushdown
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with nothing to push down
+
+explain select * from hbase_pushdown
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: -- with a predicate which is not actually part of the filter, so
+-- it should be ignored by pushdown
+
+explain select * from hbase_pushdown
+where (case when key=90 then 2 else 4 end) > 3
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with a predicate which is not actually part of the filter, so
+-- it should be ignored by pushdown
+
+explain select * from hbase_pushdown
+where (case when key=90 then 2 else 4 end) > 3
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (> (TOK_FUNCTION when (= (TOK_TABLE_OR_COL key) 90) 2 4) 3))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ Filter Operator
+ predicate:
+ expr: (CASE WHEN ((key = 90)) THEN (2) ELSE (4) END > 3)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (CASE WHEN ((key = 90)) THEN (2) ELSE (4) END > 3)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: -- with a predicate which is under an OR, so it should
+-- be ignored by pushdown
+
+explain select * from hbase_pushdown
+where key=80 or value like '%90%'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with a predicate which is under an OR, so it should
+-- be ignored by pushdown
+
+explain select * from hbase_pushdown
+where key=80 or value like '%90%'
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (or (= (TOK_TABLE_OR_COL key) 80) (like (TOK_TABLE_OR_COL value) '%90%')))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ Filter Operator
+ predicate:
+ expr: ((key = 80) or (value like '%90%'))
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: ((key = 80) or (value like '%90%'))
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: -- with pushdown disabled
+
+explain select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with pushdown disabled
+
+explain select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ Filter Operator
+ predicate:
+ expr: (key = 90)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (key = 90)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Sep 18 00:22:42 2010
@@ -26,6 +26,8 @@ import java.beans.Statement;
import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.EOFException;
import java.io.File;
@@ -37,6 +39,7 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
@@ -81,6 +84,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.parse.ErrorMsg;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -317,6 +321,40 @@ public final class Utilities {
return null;
}
+ public static String serializeExpression(ExprNodeDesc expr) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ XMLEncoder encoder = new XMLEncoder(baos);
+ try {
+ encoder.writeObject(expr);
+ } finally {
+ encoder.close();
+ }
+ try {
+ return baos.toString("UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ }
+
+ public static ExprNodeDesc deserializeExpression(
+ String s, Configuration conf) {
+ byte [] bytes;
+ try {
+ bytes = s.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ XMLDecoder decoder = new XMLDecoder(
+ bais, null, null, conf.getClassLoader());
+ try {
+ ExprNodeDesc expr = (ExprNodeDesc) decoder.readObject();
+ return expr;
+ } finally {
+ decoder.close();
+ }
+ }
+
/**
* Serialize a single Task.
*/
@@ -1425,4 +1463,22 @@ public final class Utilities {
public static boolean supportCombineFileInputFormat() {
return ShimLoader.getHadoopShims().getCombineFileInputFormat() != null;
}
+
+ public static void setColumnNameList(JobConf jobConf, Operator op) {
+ RowSchema rowSchema = op.getSchema();
+ if (rowSchema == null) {
+ return;
+ }
+ StringBuilder columnNames = new StringBuilder();
+ for (ColumnInfo colInfo : rowSchema.getSignature()) {
+ if (columnNames.length() > 0) {
+ columnNames.append(",");
+ }
+ columnNames.append(colInfo.getInternalName());
+ }
+ String columnNamesString = columnNames.toString();
+ jobConf.set(
+ Constants.LIST_COLUMNS,
+ columnNamesString);
+ }
}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java Sat Sep 18 00:22:42 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+
+/**
+ * IndexPredicateAnalyzer decomposes predicates, separating the parts
+ * which can be satisfied by an index from the parts which cannot.
+ * Currently, it only supports pure conjunctions over binary expressions
+ * comparing a column reference with a constant value. It is assumed
+ * that all column aliases encountered refer to the same table.
+ */
+public class IndexPredicateAnalyzer
+{
+ private Set<String> udfNames;
+
+ private Set<String> allowedColumnNames;
+
+ public IndexPredicateAnalyzer() {
+ udfNames = new HashSet<String>();
+ }
+
+ /**
+ * Registers a comparison operator as one which can be satisfied
+ * by an index search. Unless this is called, analyzePredicate
+ * will never find any indexable conditions.
+ *
+ * @param udfName name of comparison operator as returned
+ * by either {@link GenericUDFBridge#getUdfName} (for simple UDF's)
+ * or udf.getClass().getName() (for generic UDF's).
+ */
+ public void addComparisonOp(String udfName) {
+ udfNames.add(udfName);
+ }
+
+ /**
+ * Clears the set of column names allowed in comparisons. (Initially, all
+ * column names are allowed.)
+ */
+ public void clearAllowedColumnNames() {
+ allowedColumnNames = new HashSet<String>();
+ }
+
+ /**
+ * Adds a column name to the set of column names allowed.
+ *
+ * @param columnName name of column to be allowed
+ */
+ public void allowColumnName(String columnName) {
+ if (allowedColumnNames == null) {
+ clearAllowedColumnNames();
+ }
+ allowedColumnNames.add(columnName);
+ }
+
+ /**
+ * Analyzes a predicate.
+ *
+ * @param predicate predicate to be analyzed
+ *
+ * @param searchConditions receives conditions produced by analysis
+ *
+ * @return residual predicate which could not be translated to
+ * searchConditions
+ */
+ public ExprNodeDesc analyzePredicate(
+ ExprNodeDesc predicate,
+ final List<IndexSearchCondition> searchConditions) {
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ NodeProcessor nodeProcessor = new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+
+ // We can only push down stuff which appears as part of
+ // a pure conjunction: reject OR, CASE, etc.
+ for (Node ancestor : stack) {
+ if (nd == ancestor) {
+ break;
+ }
+ if (!FunctionRegistry.isOpAnd((ExprNodeDesc) ancestor)) {
+ return nd;
+ }
+ }
+
+ return analyzeExpr((ExprNodeDesc) nd, searchConditions, nodeOutputs);
+ }
+ };
+
+ Dispatcher disp = new DefaultRuleDispatcher(
+ nodeProcessor, opRules, null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.add(predicate);
+ HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+ try {
+ ogw.startWalking(topNodes, nodeOutput);
+ } catch (SemanticException ex) {
+ throw new RuntimeException(ex);
+ }
+ ExprNodeDesc residualPredicate = (ExprNodeDesc) nodeOutput.get(predicate);
+ return residualPredicate;
+ }
+
+ private ExprNodeDesc analyzeExpr(
+ ExprNodeDesc expr,
+ List<IndexSearchCondition> searchConditions,
+ Object... nodeOutputs) {
+
+ if (!(expr instanceof ExprNodeGenericFuncDesc)) {
+ return expr;
+ }
+ if (FunctionRegistry.isOpAnd(expr)) {
+ assert(nodeOutputs.length == 2);
+ ExprNodeDesc residual1 = (ExprNodeDesc) nodeOutputs[0];
+ ExprNodeDesc residual2 = (ExprNodeDesc) nodeOutputs[1];
+ if (residual1 == null) {
+ return residual2;
+ }
+ if (residual2 == null) {
+ return residual1;
+ }
+ List<ExprNodeDesc> residuals = new ArrayList<ExprNodeDesc>();
+ residuals.add(residual1);
+ residuals.add(residual2);
+ return new ExprNodeGenericFuncDesc(
+ TypeInfoFactory.booleanTypeInfo,
+ FunctionRegistry.getGenericUDFForAnd(),
+ residuals);
+ }
+
+ String udfName;
+ ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) expr;
+ if (funcDesc.getGenericUDF() instanceof GenericUDFBridge) {
+ GenericUDFBridge func = (GenericUDFBridge) funcDesc.getGenericUDF();
+ udfName = func.getUdfName();
+ } else {
+ udfName = funcDesc.getGenericUDF().getClass().getName();
+ }
+ if (!udfNames.contains(udfName)) {
+ return expr;
+ }
+
+ ExprNodeDesc child1 = (ExprNodeDesc) nodeOutputs[0];
+ ExprNodeDesc child2 = (ExprNodeDesc) nodeOutputs[1];
+ ExprNodeColumnDesc columnDesc = null;
+ ExprNodeConstantDesc constantDesc = null;
+ if ((child1 instanceof ExprNodeColumnDesc)
+ && (child2 instanceof ExprNodeConstantDesc)) {
+ // COL <op> CONSTANT
+ columnDesc = (ExprNodeColumnDesc) child1;
+ constantDesc = (ExprNodeConstantDesc) child2;
+ } else if ((child2 instanceof ExprNodeColumnDesc)
+ && (child1 instanceof ExprNodeConstantDesc)) {
+ // CONSTANT <op> COL
+ columnDesc = (ExprNodeColumnDesc) child2;
+ constantDesc = (ExprNodeConstantDesc) child1;
+ }
+ if (columnDesc == null) {
+ return expr;
+ }
+ if (allowedColumnNames != null) {
+ if (!allowedColumnNames.contains(columnDesc.getColumn())) {
+ return expr;
+ }
+ }
+ searchConditions.add(
+ new IndexSearchCondition(
+ columnDesc,
+ udfName,
+ constantDesc,
+ expr));
+
+ // we converted the expression to a search condition, so
+ // remove it from the residual predicate
+ return null;
+ }
+
+ /**
+ * Translates search conditions back to ExprNodeDesc form (as
+ * a left-deep conjunction).
+ *
+ * @param searchConditions (typically produced by analyzePredicate)
+ *
+ * @return ExprNodeDesc form of search conditions
+ */
+ public ExprNodeDesc translateSearchConditions(
+ List<IndexSearchCondition> searchConditions) {
+
+ ExprNodeDesc expr = null;
+ for (IndexSearchCondition searchCondition : searchConditions) {
+ if (expr == null) {
+ expr = searchCondition.getComparisonExpr();
+ continue;
+ }
+ List<ExprNodeDesc> children = new ArrayList<ExprNodeDesc>();
+ children.add(expr);
+ children.add(searchCondition.getComparisonExpr());
+ expr = new ExprNodeGenericFuncDesc(
+ TypeInfoFactory.booleanTypeInfo,
+ FunctionRegistry.getGenericUDFForAnd(),
+ children);
+ }
+ return expr;
+ }
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java Sat Sep 18 00:22:42 2010
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+/**
+ * IndexSearchCondition represents an individual search condition
+ * found by {@link IndexPredicateAnalyzer}.
+ *
+ * @author John Sichi
+ * @version $Id:$
+ */
+public class IndexSearchCondition
+{
+ private ExprNodeColumnDesc columnDesc;
+ private String comparisonOp;
+ private ExprNodeConstantDesc constantDesc;
+ private ExprNodeDesc comparisonExpr;
+
+ /**
+ * Constructs a search condition, which takes the form
+ * <pre>column-ref comparison-op constant-value</pre>.
+ *
+ * @param columnDesc column being compared
+ *
+ * @param comparisonOp comparison operator, e.g. "="
+ * (taken from GenericUDFBridge.getUdfName())
+ *
+ * @param constantDesc constant value to search for
+ *
+ * @Param comparisonExpr the original comparison expression
+ */
+ public IndexSearchCondition(
+ ExprNodeColumnDesc columnDesc,
+ String comparisonOp,
+ ExprNodeConstantDesc constantDesc,
+ ExprNodeDesc comparisonExpr) {
+
+ this.columnDesc = columnDesc;
+ this.comparisonOp = comparisonOp;
+ this.constantDesc = constantDesc;
+ this.comparisonExpr = comparisonExpr;
+ }
+
+ public void setColumnDesc(ExprNodeColumnDesc columnDesc) {
+ this.columnDesc = columnDesc;
+ }
+
+ public ExprNodeColumnDesc getColumnDesc() {
+ return columnDesc;
+ }
+
+ public void setComparisonOp(String comparisonOp) {
+ this.comparisonOp = comparisonOp;
+ }
+
+ public String getComparisonOp() {
+ return comparisonOp;
+ }
+
+ public void setConstantDesc(ExprNodeConstantDesc constantDesc) {
+ this.constantDesc = constantDesc;
+ }
+
+ public ExprNodeConstantDesc getConstantDesc() {
+ return constantDesc;
+ }
+
+ public void setComparisonExpr(ExprNodeDesc comparisonExpr) {
+ this.comparisonExpr = comparisonExpr;
+ }
+
+ public ExprNodeDesc getComparisonExpr() {
+ return comparisonExpr;
+ }
+
+ @Override
+ public String toString() {
+ return comparisonExpr.getExprString();
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Sat Sep 18 00:22:42 2010
@@ -68,7 +68,7 @@ public class BucketizedHiveInputFormat<K
// clone a jobConf for setting needed columns for reading
JobConf cloneJobConf = new JobConf(job);
- initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+ pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath()
.toString(), hsplit.getPath().toUri().getPath());
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Sat Sep 18 00:22:42 2010
@@ -340,7 +340,8 @@ public class CombineHiveInputFormat<K ex
throw new IOException("cannot find class " + inputFormatClassName);
}
- initColumnsNeeded(job, inputFormatClass, hsplit.getPath(0).toString(),
+ pushProjectionsAndFilters(job, inputFormatClass,
+ hsplit.getPath(0).toString(),
hsplit.getPath(0).toUri().getPath());
return ShimLoader.getHadoopShims().getCombineFileInputFormat()
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Sat Sep 18 00:22:42 2010
@@ -33,11 +33,15 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Writable;
@@ -215,7 +219,7 @@ public class HiveInputFormat<K extends W
// clone a jobConf for setting needed columns for reading
JobConf cloneJobConf = new JobConf(job);
- initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+ pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath()
.toString(), hsplit.getPath().toUri().getPath());
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
@@ -260,6 +264,17 @@ public class HiveInputFormat<K extends W
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob);
+ // Make filter pushdown information available to getSplits.
+ ArrayList<String> aliases =
+ mrwork.getPathToAliases().get(dir.toUri().toString());
+ if ((aliases != null) && (aliases.size() == 1)) {
+ Operator op = mrwork.getAliasToWork().get(aliases.get(0));
+ if ((op != null) && (op instanceof TableScanOperator)) {
+ TableScanOperator tableScan = (TableScanOperator) op;
+ pushFilters(newjob, tableScan);
+ }
+ }
+
FileInputFormat.setInputPaths(newjob, dir);
newjob.setInputFormat(inputFormat.getClass());
InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
@@ -309,7 +324,37 @@ public class HiveInputFormat<K extends W
return partDesc;
}
- protected void initColumnsNeeded(JobConf jobConf, Class inputFormatClass,
+ protected void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
+
+ TableScanDesc scanDesc = tableScan.getConf();
+ if (scanDesc == null) {
+ return;
+ }
+
+ // construct column name list for reference by filter push down
+ Utilities.setColumnNameList(jobConf, tableScan);
+
+ // push down filters
+ ExprNodeDesc filterExpr = scanDesc.getFilterExpr();
+ if (filterExpr == null) {
+ return;
+ }
+
+ String filterText = filterExpr.getExprString();
+ String filterExprSerialized = Utilities.serializeExpression(filterExpr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Filter text = " + filterText);
+ LOG.debug("Filter expression = " + filterExprSerialized);
+ }
+ jobConf.set(
+ TableScanDesc.FILTER_TEXT_CONF_STR,
+ filterText);
+ jobConf.set(
+ TableScanDesc.FILTER_EXPR_CONF_STR,
+ filterExprSerialized);
+ }
+
+ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass,
String splitPath, String splitPathWithNoSchema) {
if (this.mrwork == null) {
init(job);
@@ -335,12 +380,16 @@ public class HiveInputFormat<K extends W
alias);
if (op != null && op instanceof TableScanOperator) {
TableScanOperator tableScan = (TableScanOperator) op;
+
+ // push down projections
ArrayList<Integer> list = tableScan.getNeededColumnIDs();
if (list != null) {
ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
} else {
ColumnProjectionUtils.setFullyReadColumns(jobConf);
}
+
+ pushFilters(jobConf, tableScan);
}
}
}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java Sat Sep 18 00:22:42 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+
+/**
+ * HiveStoragePredicateHandler is an optional companion to {@link
+ * HiveStorageHandler}; it should only be implemented by handlers which
+ * support decomposition of predicates being pushed down into table scans.
+ */
+public interface HiveStoragePredicateHandler {
+
+ /**
+ * Gives the storage handler a chance to decompose a predicate. The storage
+ * handler should analyze the predicate and return the portion of it which
+ * cannot be evaluated during table access. For example, if the original
+ * predicate is <code>x = 2 AND upper(y)='YUM'</code>, the storage handler
+ * might be able to handle <code>x = 2</code> but leave the "residual"
+ * <code>upper(y)='YUM'</code> for Hive to deal with. The breakdown
+ * need not be non-overlapping; for example, given the
+ * predicate <code>x LIKE 'a%b'</code>, the storage handler might
+ * be able to evaluate the prefix search <code>x LIKE 'a%'</code>, leaving
+ * <code>x LIKE '%b'</code> as the residual.
+ *
+ * @param jobConf contains a job configuration matching the one that
+ * will later be passed to getRecordReader and getSplits
+ *
+ * @param deserializer deserializer which will be used when
+ * fetching rows
+ *
+ * @param predicate predicate to be decomposed
+ *
+ * @return decomposed form of predicate, or null if no pushdown is
+ * possible at all
+ */
+ public DecomposedPredicate decomposePredicate(
+ JobConf jobConf,
+ Deserializer deserializer,
+ ExprNodeDesc predicate);
+
+ /**
+ * Struct class for returning multiple values from decomposePredicate.
+ */
+ public static class DecomposedPredicate {
+ /**
+ * Portion of predicate to be evaluated by storage handler. Hive
+ * will pass this into the storage handler's input format.
+ */
+ public ExprNodeDesc pushedPredicate;
+
+ /**
+ * Portion of predicate to be post-evaluated by Hive for any rows
+ * which are returned by storage handler.
+ */
+ public ExprNodeDesc residualPredicate;
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Sat Sep 18 00:22:42 2010
@@ -36,6 +36,14 @@ public class TableScanDesc implements Se
private List<VirtualColumn> virtualCols;
+ private ExprNodeDesc filterExpr;
+
+ public static final String FILTER_EXPR_CONF_STR =
+ "hive.io.filter.expr.serialized";
+
+ public static final String FILTER_TEXT_CONF_STR =
+ "hive.io.filter.text";
+
@SuppressWarnings("nls")
public TableScanDesc() {
}
@@ -54,6 +62,15 @@ public class TableScanDesc implements Se
return alias;
}
+ @Explain(displayName = "filterExpr")
+ public ExprNodeDesc getFilterExpr() {
+ return filterExpr;
+ }
+
+ public void setFilterExpr(ExprNodeDesc filterExpr) {
+ this.filterExpr = filterExpr;
+ }
+
public void setAlias(String alias) {
this.alias = alias;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Sat Sep 18 00:22:42 2010
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -36,9 +37,14 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -47,7 +53,10 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.JobConf;
/**
* Operator factory for predicate pushdown processing of operator graph Each
@@ -63,6 +72,9 @@ import org.apache.hadoop.hive.serde2.typ
*/
public final class OpProcFactory {
+ protected static final Log LOG = LogFactory.getLog(OpProcFactory.class
+ .getName());
+
/**
* Processor for Script Operator Prevents any predicates being pushed.
*/
@@ -266,9 +278,6 @@ public final class OpProcFactory {
*/
public static class DefaultPPD implements NodeProcessor {
- protected static final Log LOG = LogFactory.getLog(OpProcFactory.class
- .getName());
-
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
@@ -377,6 +386,24 @@ public final class OpProcFactory {
if (condn == null) {
return null;
}
+
+ if (op instanceof TableScanOperator) {
+ boolean pushFilterToStorage;
+ HiveConf hiveConf = owi.getParseContext().getConf();
+ pushFilterToStorage =
+ hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTPPD_STORAGE);
+ if (pushFilterToStorage) {
+ condn = pushFilterToStorageHandler(
+ (TableScanOperator) op,
+ condn,
+ owi,
+ hiveConf);
+ if (condn == null) {
+ // we pushed the whole thing down
+ return null;
+ }
+ }
+ }
// add new filter op
List<Operator<? extends Serializable>> originalChilren = op
@@ -399,6 +426,81 @@ public final class OpProcFactory {
return output;
}
+ /**
+ * Attempts to push a predicate down into a storage handler. For
+ * native tables, this is a no-op.
+ *
+ * @param tableScanOp table scan against which predicate applies
+ *
+ * @param originalPredicate predicate to be pushed down
+ *
+ * @param owi object walk info
+ *
+ * @param hiveConf Hive configuration
+ *
+ * @return portion of predicate which needs to be evaluated
+ * by Hive as a post-filter, or null if it was possible
+ * to push down the entire predicate
+ */
+ private static ExprNodeDesc pushFilterToStorageHandler(
+ TableScanOperator tableScanOp,
+ ExprNodeDesc originalPredicate,
+ OpWalkerInfo owi,
+ HiveConf hiveConf) {
+
+ TableScanDesc tableScanDesc = tableScanOp.getConf();
+ Table tbl = owi.getParseContext().getTopToTable().get(tableScanOp);
+ if (!tbl.isNonNative()) {
+ return originalPredicate;
+ }
+ HiveStorageHandler storageHandler = tbl.getStorageHandler();
+ if (!(storageHandler instanceof HiveStoragePredicateHandler)) {
+ // The storage handler does not provide predicate decomposition
+ // support, so we'll implement the entire filter in Hive. However,
+ // we still provide the full predicate to the storage handler in
+ // case it wants to do any of its own prefiltering.
+ tableScanDesc.setFilterExpr(originalPredicate);
+ return originalPredicate;
+ }
+ HiveStoragePredicateHandler predicateHandler =
+ (HiveStoragePredicateHandler) storageHandler;
+ JobConf jobConf = new JobConf(owi.getParseContext().getConf());
+ Utilities.setColumnNameList(jobConf, tableScanOp);
+ Utilities.copyTableJobPropertiesToConf(
+ Utilities.getTableDesc(tbl),
+ jobConf);
+ Deserializer deserializer = tbl.getDeserializer();
+ HiveStoragePredicateHandler.DecomposedPredicate decomposed =
+ predicateHandler.decomposePredicate(
+ jobConf,
+ deserializer,
+ originalPredicate);
+ if (decomposed == null) {
+ // not able to push anything down
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No pushdown possible for predicate: "
+ + originalPredicate.getExprString());
+ }
+ return originalPredicate;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Original predicate: "
+ + originalPredicate.getExprString());
+ if (decomposed.pushedPredicate != null) {
+ LOG.debug(
+ "Pushed predicate: "
+ + decomposed.pushedPredicate.getExprString());
+ }
+ if (decomposed.residualPredicate != null) {
+ LOG.debug(
+ "Residual predicate: "
+ + decomposed.residualPredicate.getExprString());
+ }
+ }
+ tableScanDesc.setFilterExpr(decomposed.pushedPredicate);
+ return decomposed.residualPredicate;
+ }
+
public static NodeProcessor getFilterProc() {
return new FilterPPD();
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java Sat Sep 18 00:22:42 2010
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.RowResolver;
/**
@@ -37,10 +38,11 @@ public class OpWalkerInfo implements Nod
*/
private final HashMap<Operator<? extends Serializable>, ExprWalkerInfo> opToPushdownPredMap;
private final Map<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
+ private final ParseContext pGraphContext;
- public OpWalkerInfo(
- HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap) {
- this.opToParseCtxMap = opToParseCtxMap;
+ public OpWalkerInfo(ParseContext pGraphContext) {
+ this.pGraphContext = pGraphContext;
+ opToParseCtxMap = pGraphContext.getOpParseCtx();
opToPushdownPredMap = new HashMap<Operator<? extends Serializable>, ExprWalkerInfo>();
}
@@ -62,4 +64,7 @@ public class OpWalkerInfo implements Nod
return opToParseCtxMap.put(key, value);
}
+ public ParseContext getParseContext() {
+ return pGraphContext;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java Sat Sep 18 00:22:42 2010
@@ -81,7 +81,7 @@ public class PredicatePushDown implement
opToParseCtxMap = pGraphContext.getOpParseCtx();
// create a the context for walking operators
- OpWalkerInfo opWalkerInfo = new OpWalkerInfo(opToParseCtxMap);
+ OpWalkerInfo opWalkerInfo = new OpWalkerInfo(pGraphContext);
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("R1", "FIL%"), OpProcFactory.getFilterProc());