You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2010/09/18 02:22:43 UTC

svn commit: r998374 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/queries/ hbase-handler/src/test/results/ ql/src/java/org/apache/hadoop/hive/q...

Author: heyongqiang
Date: Sat Sep 18 00:22:42 2010
New Revision: 998374

URL: http://svn.apache.org/viewvc?rev=998374&view=rev
Log:
HIVE-1226. support filter pushdown against non-native tables. (jvs via He Yongqiang)

Added:
    hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q
    hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sat Sep 18 00:22:42 2010
@@ -132,6 +132,9 @@ Trunk -  Unreleased
     HIVE-1645. ability to specify parent directory for zookeeper lock manager
     (namit via He Yongqiang)
 
+    HIVE-1226. support filter pushdown against non-native tables
+    (jvs via He Yongqiang)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Sep 18 00:22:42 2010
@@ -262,6 +262,8 @@ public class HiveConf extends Configurat
     // Optimizer
     HIVEOPTCP("hive.optimize.cp", true), // column pruner
     HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
+    // push predicates down to storage handlers
+    HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true),
     HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
     HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
     HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Sat Sep 18 00:22:42 2010
@@ -273,6 +273,12 @@
 </property>
 
 <property>
+  <name>hive.optimize.ppd.storage</name>
+  <value>true</value>
+  <description>Whether to push predicates down into storage handlers.  Ignored when hive.optimize.ppd is false.</description>
+</property>
+
+<property>
   <name>hive.optimize.pruner</name>
   <value>true</value>
   <description>Whether to enable the new partition pruner which depends on predicate pushdown. If this is disabled,

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Sat Sep 18 00:22:42 2010
@@ -582,4 +582,11 @@ public class HBaseSerDe implements SerDe
   public void setUseJSONSerialize(boolean useJSONSerialize) {
     this.useJSONSerialize = useJSONSerialize;
   }
+
+  /**
+   * @return 0-based offset of the key column within the table
+   */
+  int getKeyColumnOffset() {
+    return iKey;
+  }
 }

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Sat Sep 18 00:22:42 2010
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -39,11 +40,17 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Constants;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.util.StringUtils;
 
@@ -51,7 +58,8 @@ import org.apache.hadoop.util.StringUtil
  * HBaseStorageHandler provides a HiveStorageHandler implementation for
  * HBase.
  */
-public class HBaseStorageHandler implements HiveStorageHandler, HiveMetaHook {
+public class HBaseStorageHandler extends DefaultStorageHandler
+  implements HiveMetaHook, HiveStoragePredicateHandler {
 
   private HBaseConfiguration hbaseConf;
   private HBaseAdmin admin;
@@ -260,4 +268,38 @@ public class HBaseStorageHandler impleme
     }
     jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
   }
+
+  @Override
+  public DecomposedPredicate decomposePredicate(
+    JobConf jobConf,
+    Deserializer deserializer,
+    ExprNodeDesc predicate)
+  {
+    String columnNameProperty = jobConf.get(
+      org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS);
+    List<String> columnNames =
+      Arrays.asList(columnNameProperty.split(","));
+    HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer;
+    IndexPredicateAnalyzer analyzer =
+      HiveHBaseTableInputFormat.newIndexPredicateAnalyzer(
+        columnNames.get(hbaseSerde.getKeyColumnOffset()));
+    List<IndexSearchCondition> searchConditions =
+      new ArrayList<IndexSearchCondition>();
+    ExprNodeDesc residualPredicate =
+      analyzer.analyzePredicate(predicate, searchConditions);
+    if (searchConditions.size() != 1) {
+      // Either there was nothing which could be pushed down (size = 0),
+      // or more than one predicate (size > 1); in the latter case,
+      // we bail out for now since multiple lookups on the key are
+      // either contradictory or redundant.  We'll need to handle
+      // this better later when we support more interesting predicates.
+      return null;
+    }
+    
+    DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
+    decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(
+      searchConditions);
+    decomposedPredicate.residualPredicate = residualPredicate;
+    return decomposedPredicate;
+  }
 }

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Sat Sep 18 00:22:42 2010
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -29,13 +30,31 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ByteStream;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -50,7 +69,7 @@ import org.apache.hadoop.mapreduce.lib.i
 /**
  * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
  * tables, decorating an underlying HBase TableInputFormat with extra Hive logic
- * such as column pruning.
+ * such as column pruning and filter pushdown.
  */
 public class HiveHBaseTableInputFormat extends TableInputFormatBase
     implements InputFormat<ImmutableBytesWritable, Result> {
@@ -64,6 +83,7 @@ public class HiveHBaseTableInputFormat e
     final Reporter reporter) throws IOException {
 
     HBaseSplit hbaseSplit = (HBaseSplit) split;
+    TableSplit tableSplit = hbaseSplit.getSplit();
     String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
     setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
     String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
@@ -105,7 +125,7 @@ public class HiveHBaseTableInputFormat e
       }
     }
 
-    // The HBase table's row key maps to an Hive table column. In the corner case when only the
+    // The HBase table's row key maps to a Hive table column. In the corner case when only the
     // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/
     // column qualifier will have been added to the scan. We arbitrarily add at least one column
     // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
@@ -128,8 +148,11 @@ public class HiveHBaseTableInputFormat e
       }
     }
 
+    // If Hive's optimizer gave us a filter to process, convert it to the
+    // HBase scan form now.
+    tableSplit = convertFilter(jobConf, scan, tableSplit, iKey);
+
     setScan(scan);
-    org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit = hbaseSplit.getSplit();
 
     Job job = new Job(jobConf);
     TaskAttemptContext tac =
@@ -200,6 +223,137 @@ public class HiveHBaseTableInputFormat e
     };
   }
 
+  /**
+   * Converts a filter (which has been pushed down from Hive's optimizer)
+   * into corresponding restrictions on the HBase scan.  The
+   * filter should already be in a form which can be fully converted.
+   *
+   * @param jobConf configuration for the scan
+   *
+   * @param scan the HBase scan object to restrict
+   *
+   * @param tableSplit the HBase table split to restrict, or null
+   * if calculating splits
+   *
+   * @param iKey 0-based offset of key column within Hive table
+   *
+   * @return converted table split if any
+   */
+  private TableSplit convertFilter(
+    JobConf jobConf,
+    Scan scan,
+    TableSplit tableSplit,
+    int iKey)
+    throws IOException {
+
+    String filterExprSerialized =
+      jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+    if (filterExprSerialized == null) {
+      return tableSplit;
+    }
+    ExprNodeDesc filterExpr =
+      Utilities.deserializeExpression(filterExprSerialized, jobConf);
+
+    String columnNameProperty = jobConf.get(Constants.LIST_COLUMNS);
+    List<String> columnNames =
+      Arrays.asList(columnNameProperty.split(","));
+
+    IndexPredicateAnalyzer analyzer =
+      newIndexPredicateAnalyzer(columnNames.get(iKey));
+
+    List<IndexSearchCondition> searchConditions =
+      new ArrayList<IndexSearchCondition>();
+    ExprNodeDesc residualPredicate =
+      analyzer.analyzePredicate(filterExpr, searchConditions);
+
+    // There should be no residual since we already negotiated
+    // that earlier in HBaseStorageHandler.decomposePredicate.
+    if (residualPredicate != null) {
+      throw new RuntimeException(
+        "Unexpected residual predicate " + residualPredicate.getExprString());
+    }
+
+    // There should be exactly one predicate since we already
+    // negotiated that also.
+    if (searchConditions.size() != 1) {
+      throw new RuntimeException(
+        "Exactly one search condition expected in push down");
+    }
+
+    // Convert the search condition into a restriction on the HBase scan
+    IndexSearchCondition sc = searchConditions.get(0);
+    ExprNodeConstantEvaluator eval =
+      new ExprNodeConstantEvaluator(sc.getConstantDesc());
+    byte [] startRow;
+    try {
+      ObjectInspector objInspector = eval.initialize(null);
+      Object writable = eval.evaluate(null);
+      ByteStream.Output serializeStream = new ByteStream.Output();
+      LazyUtils.writePrimitiveUTF8(
+        serializeStream,
+        writable,
+        (PrimitiveObjectInspector) objInspector,
+        false,
+        (byte) 0,
+        null);
+      startRow = new byte[serializeStream.getCount()];
+      System.arraycopy(
+        serializeStream.getData(), 0,
+        startRow, 0, serializeStream.getCount());
+    } catch (HiveException ex) {
+      throw new IOException(ex);
+    }
+
+    // stopRow is exclusive, so pad it with a trailing 0 byte to
+    // make it compare as the very next value after startRow
+    byte [] stopRow = new byte[startRow.length + 1];
+    System.arraycopy(startRow, 0, stopRow, 0, startRow.length);
+
+    if (tableSplit != null) {
+      tableSplit = new TableSplit(
+        tableSplit.getTableName(),
+        startRow,
+        stopRow,
+        tableSplit.getRegionLocation());
+    }
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+
+    // Add a WhileMatchFilter to make the scan terminate as soon
+    // as we see a non-matching key.  This is probably redundant
+    // since the stopRow above should already take care of it for us.
+    scan.setFilter(
+      new WhileMatchFilter(
+        new RowFilter(
+          CompareFilter.CompareOp.EQUAL,
+          new BinaryComparator(startRow))));
+    return tableSplit;
+  }
+
+  /**
+   * Instantiates a new predicate analyzer suitable for
+   * determining how to push a filter down into the HBase scan,
+   * based on the rules for what kinds of pushdown we currently support.
+   *
+   * @param keyColumnName name of the Hive column mapped to the HBase row key
+   *
+   * @return preconfigured predicate analyzer
+   */
+  static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
+    String keyColumnName) {
+
+    IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+
+    // for now, we only support equality comparisons
+    analyzer.addComparisonOp("=");
+
+    // and only on the key column
+    analyzer.clearAllowedColumnNames();
+    analyzer.allowColumnName(keyColumnName);
+
+    return analyzer;
+  }
+  
   @Override
   public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
 
@@ -226,6 +380,14 @@ public class HiveHBaseTableInputFormat e
 
     Scan scan = new Scan();
 
+    // Take filter pushdown into account while calculating splits; this
+    // allows us to prune off regions immediately.  Note that although
+    // the Javadoc for the superclass getSplits says that it returns one
+    // split per region, the implementation actually takes the scan
+    // definition into account and excludes regions which don't satisfy
+    // the start/stop row conditions (HBASE-1829).
+    convertFilter(jobConf, scan, null, iKey);
+
     // REVIEW:  are we supposed to be applying the getReadColumnIDs
     // same as in getRecordReader?
     for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
@@ -244,7 +406,9 @@ public class HiveHBaseTableInputFormat e
     Job job = new Job(jobConf);
     JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());
     Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
-    List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(jobContext);
+
+    List<org.apache.hadoop.mapreduce.InputSplit> splits =
+      super.getSplits(jobContext);
     InputSplit [] results = new InputSplit[splits.size()];
 
     for (int i = 0; i < splits.size(); i++) {

Added: hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_pushdown.q Sat Sep 18 00:22:42 2010
@@ -0,0 +1,53 @@
+CREATE TABLE hbase_pushdown(key int, value string) 
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string");
+
+INSERT OVERWRITE TABLE hbase_pushdown 
+SELECT *
+FROM src;
+
+-- with full pushdown
+explain select * from hbase_pushdown where key=90;
+
+select * from hbase_pushdown where key=90;
+
+-- with partial pushdown
+
+explain select * from hbase_pushdown where key=90 and value like '%90%';
+
+select * from hbase_pushdown where key=90 and value like '%90%';
+
+-- with two residuals
+
+explain select * from hbase_pushdown
+where key=90 and value like '%90%' and key=cast(value as int);
+
+-- with contradictory pushdowns
+
+explain select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%';
+
+select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%';
+
+-- with nothing to push down
+
+explain select * from hbase_pushdown;
+
+-- with a predicate which is not actually part of the filter, so
+-- it should be ignored by pushdown
+
+explain select * from hbase_pushdown
+where (case when key=90 then 2 else 4 end) > 3;
+
+-- with a predicate which is under an OR, so it should
+-- be ignored by pushdown
+
+explain select * from hbase_pushdown
+where key=80 or value like '%90%';
+
+set hive.optimize.ppd.storage=false;
+
+-- with pushdown disabled
+
+explain select * from hbase_pushdown where key=90;

Added: hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/results/hbase_pushdown.q.out Sat Sep 18 00:22:42 2010
@@ -0,0 +1,427 @@
+PREHOOK: query: CREATE TABLE hbase_pushdown(key int, value string) 
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string")
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE hbase_pushdown(key int, value string) 
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@hbase_pushdown
+PREHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown 
+SELECT *
+FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbase_pushdown
+POSTHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown 
+SELECT *
+FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbase_pushdown
+PREHOOK: query: -- with full pushdown
+explain select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with full pushdown
+explain select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        hbase_pushdown 
+          TableScan
+            alias: hbase_pushdown
+            filterExpr:
+                expr: (key = 90)
+                type: boolean
+            Filter Operator
+              predicate:
+                  expr: (key = 90)
+                  type: boolean
+              Select Operator
+                expressions:
+                      expr: key
+                      type: int
+                      expr: value
+                      type: string
+                outputColumnNames: _col0, _col1
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 0
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_pushdown
+PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-05-54_049_1244315391309244934/-mr-10000
+POSTHOOK: query: select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_pushdown
+POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-05-54_049_1244315391309244934/-mr-10000
+90	val_90
+PREHOOK: query: -- with partial pushdown
+
+explain select * from hbase_pushdown where key=90 and value like '%90%'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with partial pushdown
+
+explain select * from hbase_pushdown where key=90 and value like '%90%'
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (= (TOK_TABLE_OR_COL key) 90) (like (TOK_TABLE_OR_COL value) '%90%')))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        hbase_pushdown 
+          TableScan
+            alias: hbase_pushdown
+            filterExpr:
+                expr: (key = 90)
+                type: boolean
+            Filter Operator
+              predicate:
+                  expr: (value like '%90%')
+                  type: boolean
+              Filter Operator
+                predicate:
+                    expr: ((key = 90) and (value like '%90%'))
+                    type: boolean
+                Select Operator
+                  expressions:
+                        expr: key
+                        type: int
+                        expr: value
+                        type: string
+                  outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: select * from hbase_pushdown where key=90 and value like '%90%'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_pushdown
+PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-00_089_9169062048458581014/-mr-10000
+POSTHOOK: query: select * from hbase_pushdown where key=90 and value like '%90%'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_pushdown
+POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-00_089_9169062048458581014/-mr-10000
+90	val_90
+PREHOOK: query: -- with two residuals
+
+explain select * from hbase_pushdown
+where key=90 and value like '%90%' and key=cast(value as int)
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with two residuals
+
+explain select * from hbase_pushdown
+where key=90 and value like '%90%' and key=cast(value as int)
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (and (= (TOK_TABLE_OR_COL key) 90) (like (TOK_TABLE_OR_COL value) '%90%')) (= (TOK_TABLE_OR_COL key) (TOK_FUNCTION TOK_INT (TOK_TABLE_OR_COL value)))))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        hbase_pushdown 
+          TableScan
+            alias: hbase_pushdown
+            filterExpr:
+                expr: (key = 90)
+                type: boolean
+            Filter Operator
+              predicate:
+                  expr: ((value like '%90%') and (key = UDFToInteger(value)))
+                  type: boolean
+              Filter Operator
+                predicate:
+                    expr: (((key = 90) and (value like '%90%')) and (key = UDFToInteger(value)))
+                    type: boolean
+                Select Operator
+                  expressions:
+                        expr: key
+                        type: int
+                        expr: value
+                        type: string
+                  outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: -- with contradictory pushdowns
+
+explain select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with contradictory pushdowns
+
+explain select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%'
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (and (= (TOK_TABLE_OR_COL key) 80) (= (TOK_TABLE_OR_COL key) 90)) (like (TOK_TABLE_OR_COL value) '%90%')))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        hbase_pushdown 
+          TableScan
+            alias: hbase_pushdown
+            Filter Operator
+              predicate:
+                  expr: (((key = 80) and (key = 90)) and (value like '%90%'))
+                  type: boolean
+              Filter Operator
+                predicate:
+                    expr: (((key = 80) and (key = 90)) and (value like '%90%'))
+                    type: boolean
+                Select Operator
+                  expressions:
+                        expr: key
+                        type: int
+                        expr: value
+                        type: string
+                  outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_pushdown
+PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-05_982_8346366828445837832/-mr-10000
+POSTHOOK: query: select * from hbase_pushdown
+where key=80 and key=90 and value like '%90%'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_pushdown
+POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-05_982_8346366828445837832/-mr-10000
+PREHOOK: query: -- with nothing to push down
+
+explain select * from hbase_pushdown
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with nothing to push down
+
+explain select * from hbase_pushdown
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: -- with a predicate which is not actually part of the filter, so
+-- it should be ignored by pushdown
+
+explain select * from hbase_pushdown
+where (case when key=90 then 2 else 4 end) > 3
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with a predicate which is not actually part of the filter, so
+-- it should be ignored by pushdown
+
+explain select * from hbase_pushdown
+where (case when key=90 then 2 else 4 end) > 3
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (> (TOK_FUNCTION when (= (TOK_TABLE_OR_COL key) 90) 2 4) 3))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        hbase_pushdown 
+          TableScan
+            alias: hbase_pushdown
+            Filter Operator
+              predicate:
+                  expr: (CASE WHEN ((key = 90)) THEN (2) ELSE (4) END > 3)
+                  type: boolean
+              Filter Operator
+                predicate:
+                    expr: (CASE WHEN ((key = 90)) THEN (2) ELSE (4) END > 3)
+                    type: boolean
+                Select Operator
+                  expressions:
+                        expr: key
+                        type: int
+                        expr: value
+                        type: string
+                  outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: -- with a predicate which is under an OR, so it should
+-- be ignored by pushdown
+
+explain select * from hbase_pushdown
+where key=80 or value like '%90%'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with a predicate which is under an OR, so it should
+-- be ignored by pushdown
+
+explain select * from hbase_pushdown
+where key=80 or value like '%90%'
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (or (= (TOK_TABLE_OR_COL key) 80) (like (TOK_TABLE_OR_COL value) '%90%')))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        hbase_pushdown 
+          TableScan
+            alias: hbase_pushdown
+            Filter Operator
+              predicate:
+                  expr: ((key = 80) or (value like '%90%'))
+                  type: boolean
+              Filter Operator
+                predicate:
+                    expr: ((key = 80) or (value like '%90%'))
+                    type: boolean
+                Select Operator
+                  expressions:
+                        expr: key
+                        type: int
+                        expr: value
+                        type: string
+                  outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: -- with pushdown disabled
+
+explain select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with pushdown disabled
+
+explain select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        hbase_pushdown 
+          TableScan
+            alias: hbase_pushdown
+            Filter Operator
+              predicate:
+                  expr: (key = 90)
+                  type: boolean
+              Filter Operator
+                predicate:
+                    expr: (key = 90)
+                    type: boolean
+                Select Operator
+                  expressions:
+                        expr: key
+                        type: int
+                        expr: value
+                        type: string
+                  outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Sep 18 00:22:42 2010
@@ -26,6 +26,8 @@ import java.beans.Statement;
 import java.beans.XMLDecoder;
 import java.beans.XMLEncoder;
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.EOFException;
 import java.io.File;
@@ -37,6 +39,7 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -81,6 +84,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.parse.ErrorMsg;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -317,6 +321,40 @@ public final class Utilities {
     return null;
   }
 
+  public static String serializeExpression(ExprNodeDesc expr) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    XMLEncoder encoder = new XMLEncoder(baos);
+    try {
+      encoder.writeObject(expr);
+    } finally {
+      encoder.close();
+    }
+    try {
+      return baos.toString("UTF-8");
+    } catch (UnsupportedEncodingException ex) {
+      throw new RuntimeException("UTF-8 support required", ex);
+    }
+  }
+
+  public static ExprNodeDesc deserializeExpression(
+    String s, Configuration conf) {
+    byte [] bytes;
+    try {
+      bytes = s.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException ex) {
+      throw new RuntimeException("UTF-8 support required", ex);
+    }
+    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+    XMLDecoder decoder = new XMLDecoder(
+      bais, null, null, conf.getClassLoader());
+    try {
+      ExprNodeDesc expr = (ExprNodeDesc) decoder.readObject();
+      return expr;
+    } finally {
+      decoder.close();
+    }
+  }
+  
   /**
    * Serialize a single Task.
    */
@@ -1425,4 +1463,22 @@ public final class Utilities {
   public static boolean supportCombineFileInputFormat() {
     return ShimLoader.getHadoopShims().getCombineFileInputFormat() != null;
   }
+
+  public static void setColumnNameList(JobConf jobConf, Operator op) {
+    RowSchema rowSchema = op.getSchema();
+    if (rowSchema == null) {
+      return;
+    }
+    StringBuilder columnNames = new StringBuilder();
+    for (ColumnInfo colInfo : rowSchema.getSignature()) {
+      if (columnNames.length() > 0) {
+        columnNames.append(",");
+      }
+      columnNames.append(colInfo.getInternalName());
+    }
+    String columnNamesString = columnNames.toString();
+    jobConf.set(
+      Constants.LIST_COLUMNS,
+      columnNamesString);
+  }
 }

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java Sat Sep 18 00:22:42 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+
+/**
+ * IndexPredicateAnalyzer decomposes predicates, separating the parts
+ * which can be satisfied by an index from the parts which cannot.
+ * Currently, it only supports pure conjunctions over binary expressions
+ * comparing a column reference with a constant value.  It is assumed
+ * that all column aliases encountered refer to the same table.
+ */
+public class IndexPredicateAnalyzer
+{
+  private Set<String> udfNames;
+
+  private Set<String> allowedColumnNames;
+  
+  public IndexPredicateAnalyzer() {
+    udfNames = new HashSet<String>();
+  }
+
+  /**
+   * Registers a comparison operator as one which can be satisfied
+   * by an index search.  Unless this is called, analyzePredicate
+   * will never find any indexable conditions.
+   *
+   * @param udfName name of comparison operator as returned
+   * by either {@link GenericUDFBridge#getUdfName} (for simple UDF's)
+   * or udf.getClass().getName() (for generic UDF's).
+   */
+  public void addComparisonOp(String udfName) {
+    udfNames.add(udfName);
+  }
+
+  /**
+   * Clears the set of column names allowed in comparisons.  (Initially, all
+   * column names are allowed.)
+   */
+  public void clearAllowedColumnNames() {
+    allowedColumnNames = new HashSet<String>();
+  }
+
+  /**
+   * Adds a column name to the set of column names allowed.
+   *
+   * @param columnName name of column to be allowed
+   */
+  public void allowColumnName(String columnName) {
+    if (allowedColumnNames == null) {
+      clearAllowedColumnNames();
+    }
+    allowedColumnNames.add(columnName);
+  }
+
+  /**
+   * Analyzes a predicate.
+   *
+   * @param predicate predicate to be analyzed
+   *
+   * @param searchConditions receives conditions produced by analysis
+   *
+   * @return residual predicate which could not be translated to
+   * searchConditions
+   */
+  public ExprNodeDesc analyzePredicate(
+    ExprNodeDesc predicate,
+    final List<IndexSearchCondition> searchConditions) {
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    NodeProcessor nodeProcessor = new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+        NodeProcessorCtx procCtx, Object... nodeOutputs)
+        throws SemanticException {
+
+        // We can only push down stuff which appears as part of
+        // a pure conjunction:  reject OR, CASE, etc.
+        for (Node ancestor : stack) {
+          if (nd == ancestor) {
+            break;
+          }
+          if (!FunctionRegistry.isOpAnd((ExprNodeDesc) ancestor)) {
+            return nd;
+          }
+        }
+
+        return analyzeExpr((ExprNodeDesc) nd, searchConditions, nodeOutputs);
+      }
+    };
+
+    Dispatcher disp = new DefaultRuleDispatcher(
+      nodeProcessor, opRules, null);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.add(predicate);
+    HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+    try {
+      ogw.startWalking(topNodes, nodeOutput);
+    } catch (SemanticException ex) {
+      throw new RuntimeException(ex);
+    }
+    ExprNodeDesc residualPredicate = (ExprNodeDesc) nodeOutput.get(predicate);
+    return residualPredicate;
+  }
+
+  private ExprNodeDesc analyzeExpr(
+    ExprNodeDesc expr,
+    List<IndexSearchCondition> searchConditions,
+    Object... nodeOutputs) {
+
+    if (!(expr instanceof ExprNodeGenericFuncDesc)) {
+      return expr;
+    }
+    if (FunctionRegistry.isOpAnd(expr)) {
+      assert(nodeOutputs.length == 2);
+      ExprNodeDesc residual1 = (ExprNodeDesc) nodeOutputs[0];
+      ExprNodeDesc residual2 = (ExprNodeDesc) nodeOutputs[1];
+      if (residual1 == null) {
+        return residual2;
+      }
+      if (residual2 == null) {
+        return residual1;
+      }
+      List<ExprNodeDesc> residuals = new ArrayList<ExprNodeDesc>();
+      residuals.add(residual1);
+      residuals.add(residual2);
+      return new ExprNodeGenericFuncDesc(
+        TypeInfoFactory.booleanTypeInfo,
+        FunctionRegistry.getGenericUDFForAnd(),
+        residuals);
+    }
+
+    String udfName;
+    ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) expr;
+    if (funcDesc.getGenericUDF() instanceof GenericUDFBridge) {
+      GenericUDFBridge func = (GenericUDFBridge) funcDesc.getGenericUDF();
+      udfName = func.getUdfName();
+    } else {
+      udfName = funcDesc.getGenericUDF().getClass().getName();
+    }
+    if (!udfNames.contains(udfName)) {
+      return expr;
+    }
+
+    ExprNodeDesc child1 = (ExprNodeDesc) nodeOutputs[0];
+    ExprNodeDesc child2 = (ExprNodeDesc) nodeOutputs[1];
+    ExprNodeColumnDesc columnDesc = null;
+    ExprNodeConstantDesc constantDesc = null;
+    if ((child1 instanceof ExprNodeColumnDesc)
+      && (child2 instanceof ExprNodeConstantDesc)) {
+      // COL <op> CONSTANT
+      columnDesc = (ExprNodeColumnDesc) child1;
+      constantDesc = (ExprNodeConstantDesc) child2;
+    } else if ((child2 instanceof ExprNodeColumnDesc)
+      && (child1 instanceof ExprNodeConstantDesc)) {
+      // CONSTANT <op> COL
+      columnDesc = (ExprNodeColumnDesc) child2;
+      constantDesc = (ExprNodeConstantDesc) child1;
+    }
+    if (columnDesc == null) {
+      return expr;
+    }
+    if (allowedColumnNames != null) {
+      if (!allowedColumnNames.contains(columnDesc.getColumn())) {
+        return expr;
+      }
+    }
+    searchConditions.add(
+      new IndexSearchCondition(
+        columnDesc,
+        udfName,
+        constantDesc,
+        expr));
+
+    // we converted the expression to a search condition, so
+    // remove it from the residual predicate
+    return null;
+  }
+
+  /**
+   * Translates search conditions back to ExprNodeDesc form (as
+   * a left-deep conjunction).
+   *
+   * @param searchConditions (typically produced by analyzePredicate)
+   *
+   * @return ExprNodeDesc form of search conditions
+   */
+  public ExprNodeDesc translateSearchConditions(
+    List<IndexSearchCondition> searchConditions) {
+
+    ExprNodeDesc expr = null;
+    for (IndexSearchCondition searchCondition : searchConditions) {
+      if (expr == null) {
+        expr = searchCondition.getComparisonExpr();
+        continue;
+      }
+      List<ExprNodeDesc> children = new ArrayList<ExprNodeDesc>();
+      children.add(expr);
+      children.add(searchCondition.getComparisonExpr());
+      expr = new ExprNodeGenericFuncDesc(
+        TypeInfoFactory.booleanTypeInfo,
+        FunctionRegistry.getGenericUDFForAnd(),
+        children);
+    }
+    return expr;
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java Sat Sep 18 00:22:42 2010
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+/**
+ * IndexSearchCondition represents an individual search condition
+ * found by {@link IndexPredicateAnalyzer}.
+ *
+ * @author John Sichi
+ * @version $Id:$
+ */
+public class IndexSearchCondition
+{
+  private ExprNodeColumnDesc columnDesc;
+  private String comparisonOp;
+  private ExprNodeConstantDesc constantDesc;
+  private ExprNodeDesc comparisonExpr;
+
+  /**
+   * Constructs a search condition, which takes the form
+   * <pre>column-ref comparison-op constant-value</pre>.
+   *
+   * @param columnDesc column being compared
+   *
+   * @param comparisonOp comparison operator, e.g. "="
+   * (taken from GenericUDFBridge.getUdfName())
+   *
+   * @param constantDesc constant value to search for
+   *
+   * @Param comparisonExpr the original comparison expression
+   */
+  public IndexSearchCondition(
+    ExprNodeColumnDesc columnDesc,
+    String comparisonOp,
+    ExprNodeConstantDesc constantDesc,
+    ExprNodeDesc comparisonExpr) {
+
+    this.columnDesc = columnDesc;
+    this.comparisonOp = comparisonOp;
+    this.constantDesc = constantDesc;
+    this.comparisonExpr = comparisonExpr;
+  }
+
+  public void setColumnDesc(ExprNodeColumnDesc columnDesc) {
+    this.columnDesc = columnDesc;
+  }
+  
+  public ExprNodeColumnDesc getColumnDesc() {
+    return columnDesc;
+  }
+
+  public void setComparisonOp(String comparisonOp) {
+    this.comparisonOp = comparisonOp;
+  }
+
+  public String getComparisonOp() {
+    return comparisonOp;
+  }
+
+  public void setConstantDesc(ExprNodeConstantDesc constantDesc) {
+    this.constantDesc = constantDesc;
+  }
+
+  public ExprNodeConstantDesc getConstantDesc() {
+    return constantDesc;
+  }
+
+  public void setComparisonExpr(ExprNodeDesc comparisonExpr) {
+    this.comparisonExpr = comparisonExpr;
+  }
+
+  public ExprNodeDesc getComparisonExpr() {
+    return comparisonExpr;
+  }
+  
+  @Override
+  public String toString() {
+    return comparisonExpr.getExprString();
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Sat Sep 18 00:22:42 2010
@@ -68,7 +68,7 @@ public class BucketizedHiveInputFormat<K
 
     // clone a jobConf for setting needed columns for reading
     JobConf cloneJobConf = new JobConf(job);
-    initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+    pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath()
         .toString(), hsplit.getPath().toUri().getPath());
 
     InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Sat Sep 18 00:22:42 2010
@@ -340,7 +340,8 @@ public class CombineHiveInputFormat<K ex
       throw new IOException("cannot find class " + inputFormatClassName);
     }
 
-    initColumnsNeeded(job, inputFormatClass, hsplit.getPath(0).toString(),
+    pushProjectionsAndFilters(job, inputFormatClass,
+        hsplit.getPath(0).toString(),
         hsplit.getPath(0).toUri().getPath());
 
     return ShimLoader.getHadoopShims().getCombineFileInputFormat()

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Sat Sep 18 00:22:42 2010
@@ -33,11 +33,15 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Writable;
@@ -215,7 +219,7 @@ public class HiveInputFormat<K extends W
 
     // clone a jobConf for setting needed columns for reading
     JobConf cloneJobConf = new JobConf(job);
-    initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+    pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath()
         .toString(), hsplit.getPath().toUri().getPath());
 
     InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
@@ -260,6 +264,17 @@ public class HiveInputFormat<K extends W
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
       Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob);
 
+      // Make filter pushdown information available to getSplits.
+      ArrayList<String> aliases =
+        mrwork.getPathToAliases().get(dir.toUri().toString());
+      if ((aliases != null) && (aliases.size() == 1)) {
+        Operator op = mrwork.getAliasToWork().get(aliases.get(0));
+        if ((op != null) && (op instanceof TableScanOperator)) {
+          TableScanOperator tableScan = (TableScanOperator) op;
+          pushFilters(newjob, tableScan);
+        }
+      }
+      
       FileInputFormat.setInputPaths(newjob, dir);
       newjob.setInputFormat(inputFormat.getClass());
       InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
@@ -309,7 +324,37 @@ public class HiveInputFormat<K extends W
     return partDesc;
   }
 
-  protected void initColumnsNeeded(JobConf jobConf, Class inputFormatClass,
+  protected void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
+
+    TableScanDesc scanDesc = tableScan.getConf();
+    if (scanDesc == null) {
+      return;
+    }
+
+    // construct column name list for reference by filter push down
+    Utilities.setColumnNameList(jobConf, tableScan);
+
+    // push down filters
+    ExprNodeDesc filterExpr = scanDesc.getFilterExpr();
+    if (filterExpr == null) {
+      return;
+    }
+
+    String filterText = filterExpr.getExprString();
+    String filterExprSerialized = Utilities.serializeExpression(filterExpr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Filter text = " + filterText);
+      LOG.debug("Filter expression = " + filterExprSerialized);
+    }
+    jobConf.set(
+      TableScanDesc.FILTER_TEXT_CONF_STR,
+      filterText);
+    jobConf.set(
+      TableScanDesc.FILTER_EXPR_CONF_STR,
+      filterExprSerialized);
+  }
+  
+  protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass,
       String splitPath, String splitPathWithNoSchema) {
     if (this.mrwork == null) {
       init(job);
@@ -335,12 +380,16 @@ public class HiveInputFormat<K extends W
           alias);
       if (op != null && op instanceof TableScanOperator) {
         TableScanOperator tableScan = (TableScanOperator) op;
+
+        // push down projections
         ArrayList<Integer> list = tableScan.getNeededColumnIDs();
         if (list != null) {
           ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
         } else {
           ColumnProjectionUtils.setFullyReadColumns(jobConf);
         }
+
+        pushFilters(jobConf, tableScan);
       }
     }
   }

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java?rev=998374&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java Sat Sep 18 00:22:42 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+
+/**
+ * HiveStoragePredicateHandler is an optional companion to {@link
+ * HiveStorageHandler}; it should only be implemented by handlers which
+ * support decomposition of predicates being pushed down into table scans.
+ */
+public interface HiveStoragePredicateHandler {
+
+  /**
+   * Gives the storage handler a chance to decompose a predicate.  The storage
+   * handler should analyze the predicate and return the portion of it which
+   * cannot be evaluated during table access.  For example, if the original
+   * predicate is <code>x = 2 AND upper(y)='YUM'</code>, the storage handler
+   * might be able to handle <code>x = 2</code> but leave the "residual"
+   * <code>upper(y)='YUM'</code> for Hive to deal with.  The breakdown
+   * need not be non-overlapping; for example, given the
+   * predicate <code>x LIKE 'a%b'</code>, the storage handler might
+   * be able to evaluate the prefix search <code>x LIKE 'a%'</code>, leaving
+   * <code>x LIKE '%b'</code> as the residual.
+   *
+   * @param jobConf contains a job configuration matching the one that
+   * will later be passed to getRecordReader and getSplits
+   *
+   * @param deserializer deserializer which will be used when
+   * fetching rows
+   *
+   * @param predicate predicate to be decomposed
+   *
+   * @return decomposed form of predicate, or null if no pushdown is
+   * possible at all
+   */
+  public DecomposedPredicate decomposePredicate(
+    JobConf jobConf,
+    Deserializer deserializer,
+    ExprNodeDesc predicate);
+
+  /**
+   * Struct class for returning multiple values from decomposePredicate.
+   */
+  public static class DecomposedPredicate {
+    /**
+     * Portion of predicate to be evaluated by storage handler.  Hive
+     * will pass this into the storage handler's input format.
+     */
+    public ExprNodeDesc pushedPredicate;
+
+    /**
+     * Portion of predicate to be post-evaluated by Hive for any rows
+     * which are returned by storage handler.
+     */
+    public ExprNodeDesc residualPredicate;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Sat Sep 18 00:22:42 2010
@@ -36,6 +36,14 @@ public class TableScanDesc implements Se
   
   private List<VirtualColumn> virtualCols;
 
+  private ExprNodeDesc filterExpr;
+
+  public static final String FILTER_EXPR_CONF_STR =
+    "hive.io.filter.expr.serialized";
+
+  public static final String FILTER_TEXT_CONF_STR =
+    "hive.io.filter.text";
+
   @SuppressWarnings("nls")
   public TableScanDesc() {
   }
@@ -54,6 +62,15 @@ public class TableScanDesc implements Se
     return alias;
   }
 
+  @Explain(displayName = "filterExpr")
+  public ExprNodeDesc getFilterExpr() {
+    return filterExpr;
+  }
+
+  public void setFilterExpr(ExprNodeDesc filterExpr) {
+    this.filterExpr = filterExpr;
+  }
+
   public void setAlias(String alias) {
     this.alias = alias;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Sat Sep 18 00:22:42 2010
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -36,9 +37,14 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -47,7 +53,10 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Operator factory for predicate pushdown processing of operator graph Each
@@ -63,6 +72,9 @@ import org.apache.hadoop.hive.serde2.typ
  */
 public final class OpProcFactory {
 
+  protected static final Log LOG = LogFactory.getLog(OpProcFactory.class
+    .getName());
+
   /**
    * Processor for Script Operator Prevents any predicates being pushed.
    */
@@ -266,9 +278,6 @@ public final class OpProcFactory {
    */
   public static class DefaultPPD implements NodeProcessor {
 
-    protected static final Log LOG = LogFactory.getLog(OpProcFactory.class
-        .getName());
-
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
@@ -377,6 +386,24 @@ public final class OpProcFactory {
     if (condn == null) {
       return null;
     }
+    
+    if (op instanceof TableScanOperator) {
+      boolean pushFilterToStorage;
+      HiveConf hiveConf = owi.getParseContext().getConf();
+      pushFilterToStorage =
+        hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTPPD_STORAGE);
+      if (pushFilterToStorage) {
+        condn = pushFilterToStorageHandler(
+          (TableScanOperator) op,
+          condn,
+          owi,
+          hiveConf);
+        if (condn == null) {
+          // we pushed the whole thing down
+          return null;
+        }
+      }
+    }
 
     // add new filter op
     List<Operator<? extends Serializable>> originalChilren = op
@@ -399,6 +426,81 @@ public final class OpProcFactory {
     return output;
   }
 
+  /**
+   * Attempts to push a predicate down into a storage handler.  For
+   * native tables, this is a no-op.
+   *
+   * @param tableScanOp table scan against which predicate applies
+   *
+   * @param originalPredicate predicate to be pushed down
+   *
+   * @param owi object walk info
+   *
+   * @param hiveConf Hive configuration
+   *
+   * @return portion of predicate which needs to be evaluated
+   * by Hive as a post-filter, or null if it was possible
+   * to push down the entire predicate
+   */
+  private static ExprNodeDesc pushFilterToStorageHandler(
+    TableScanOperator tableScanOp,
+    ExprNodeDesc originalPredicate,
+    OpWalkerInfo owi,
+    HiveConf hiveConf) {
+
+    TableScanDesc tableScanDesc = tableScanOp.getConf();
+    Table tbl = owi.getParseContext().getTopToTable().get(tableScanOp);
+    if (!tbl.isNonNative()) {
+      return originalPredicate;
+    }
+    HiveStorageHandler storageHandler = tbl.getStorageHandler();
+    if (!(storageHandler instanceof HiveStoragePredicateHandler)) {
+      // The storage handler does not provide predicate decomposition
+      // support, so we'll implement the entire filter in Hive.  However,
+      // we still provide the full predicate to the storage handler in
+      // case it wants to do any of its own prefiltering.
+      tableScanDesc.setFilterExpr(originalPredicate);
+      return originalPredicate;
+    }
+    HiveStoragePredicateHandler predicateHandler =
+      (HiveStoragePredicateHandler) storageHandler;
+    JobConf jobConf = new JobConf(owi.getParseContext().getConf());
+    Utilities.setColumnNameList(jobConf, tableScanOp);
+    Utilities.copyTableJobPropertiesToConf(
+      Utilities.getTableDesc(tbl),
+      jobConf);
+    Deserializer deserializer = tbl.getDeserializer();
+    HiveStoragePredicateHandler.DecomposedPredicate decomposed =
+      predicateHandler.decomposePredicate(
+        jobConf,
+        deserializer,
+        originalPredicate);
+    if (decomposed == null) {
+      // not able to push anything down
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No pushdown possible for predicate:  "
+          + originalPredicate.getExprString());
+      }
+      return originalPredicate;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Original predicate:  "
+        + originalPredicate.getExprString());
+      if (decomposed.pushedPredicate != null) {
+        LOG.debug(
+          "Pushed predicate:  "
+          + decomposed.pushedPredicate.getExprString());
+      }
+      if (decomposed.residualPredicate != null) {
+        LOG.debug(
+          "Residual predicate:  "
+          + decomposed.residualPredicate.getExprString());
+      }
+    }
+    tableScanDesc.setFilterExpr(decomposed.pushedPredicate);
+    return decomposed.residualPredicate;
+  }
+  
   public static NodeProcessor getFilterProc() {
     return new FilterPPD();
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java Sat Sep 18 00:22:42 2010
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 
 /**
@@ -37,10 +38,11 @@ public class OpWalkerInfo implements Nod
    */
   private final HashMap<Operator<? extends Serializable>, ExprWalkerInfo> opToPushdownPredMap;
   private final Map<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
+  private final ParseContext pGraphContext;
 
-  public OpWalkerInfo(
-      HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap) {
-    this.opToParseCtxMap = opToParseCtxMap;
+  public OpWalkerInfo(ParseContext pGraphContext) {
+    this.pGraphContext = pGraphContext;
+    opToParseCtxMap = pGraphContext.getOpParseCtx();
     opToPushdownPredMap = new HashMap<Operator<? extends Serializable>, ExprWalkerInfo>();
   }
 
@@ -62,4 +64,7 @@ public class OpWalkerInfo implements Nod
     return opToParseCtxMap.put(key, value);
   }
 
+  public ParseContext getParseContext() {
+    return pGraphContext;
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java?rev=998374&r1=998373&r2=998374&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java Sat Sep 18 00:22:42 2010
@@ -81,7 +81,7 @@ public class PredicatePushDown implement
     opToParseCtxMap = pGraphContext.getOpParseCtx();
 
     // create a the context for walking operators
-    OpWalkerInfo opWalkerInfo = new OpWalkerInfo(opToParseCtxMap);
+    OpWalkerInfo opWalkerInfo = new OpWalkerInfo(pGraphContext);
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(new RuleRegExp("R1", "FIL%"), OpProcFactory.getFilterProc());