You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/07 07:30:02 UTC

svn commit: r1511177 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ metastore/src/java/org/apache/hadoop/hive/metastore/ metastore/src/java/org/apache/hadoop/hive/metastore/parser/ ql/src/java/org/apache/hadoop/hive/ql/metadata/

Author: hashutosh
Date: Wed Aug  7 05:30:01 2013
New Revision: 1511177

URL: http://svn.apache.org/r1511177
Log:
HIVE-4051 : Hive's metastore suffers from 1+N queries when querying partitions & is slow (Sergey Shelukhin via Ashutosh Chauhan)

Added:
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1511177&r1=1511176&r2=1511177&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Aug  7 05:30:01 2013
@@ -344,6 +344,7 @@ public class HiveConf extends Configurat
     METASTORE_EXECUTE_SET_UGI("hive.metastore.execute.setugi", false),
     METASTORE_PARTITION_NAME_WHITELIST_PATTERN(
         "hive.metastore.partition.name.whitelist.pattern", ""),
+    METASTORE_TRY_DIRECT_SQL("hive.metastore.try.direct.sql", true),
     METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES(
         "hive.metastore.disallow.incompatible.col.type.changes", false),
 

Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java?rev=1511177&view=auto
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java (added)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java Wed Aug  7 05:30:01 2013
@@ -0,0 +1,564 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.commons.lang.StringUtils.repeat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
+import org.apache.hadoop.hive.metastore.parser.FilterParser;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
+
+/**
+ * This class contains the optimizations for MetaStore that rely on direct SQL access to
+ * the underlying database. It should use ANSI SQL and be compatible with common databases
+ * such as MySQL (note that MySQL doesn't use full ANSI mode by default), Postgres, etc.
+ *
+ * As of now, only the partition retrieval is done this way to improve job startup time;
+ * JDOQL partition retrieval is still present so as not to limit the ORM solution we have
+ * to SQL stores only. There's always a way to do without direct SQL.
+ */
+class MetaStoreDirectSql {
+  private static final Log LOG = LogFactory.getLog(MetaStoreDirectSql.class);
+
+  private final PersistenceManager pm;
+
+  public MetaStoreDirectSql(PersistenceManager pm) {
+    this.pm = pm;
+  }
+
+  /**
+   * Gets partitions by using direct SQL queries.
+   * @param dbName Metastore db name.
+   * @param tblName Metastore table name.
+   * @param partNames Partition names to get.
+   * @return List of partitions.
+   */
+  public List<Partition> getPartitionsViaSqlFilter(
+      String dbName, String tblName, List<String> partNames) throws MetaException {
+    String list = repeat(",?", partNames.size()).substring(1);
+    return getPartitionsViaSqlFilterInternal(dbName, tblName,
+        "and PARTITIONS.PART_NAME in (" + list + ")" , partNames, new ArrayList<String>());
+  }
+
+  /**
+   * Gets partitions by using direct SQL queries.
+   * @param dbName Metastore db name.
+   * @param tblName Metastore table name.
+   * @param parser The parsed filter from which the SQL filter will be generated.
+   * @return List of partitions.
+   */
+  public List<Partition> getPartitionsViaSqlFilter(Table table, String dbName,
+      String tblName, FilterParser parser) throws MetaException {
+    List<String> params = new ArrayList<String>(), joins = new ArrayList<String>();
+    String sqlFilter = (parser == null) ? null
+        : PartitionFilterGenerator.generateSqlFilter(table, parser.tree, params, joins);
+    return getPartitionsViaSqlFilterInternal(dbName, tblName, sqlFilter, params, joins);
+  }
+
+  /**
+   * Get partition objects for the query using direct SQL queries, to avoid bazillion
+   * queries created by DN retrieving stuff for each object individually.
+   * @param dbName Metastore db name.
+   * @param tblName Metastore table name.
+   * @param sqlFilter SQL filter to use. Better be SQL92-compliant. Can be null.
+   * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order.
+   * @param joinsForFilter if the filter needs additional join statement, they must be in
+   *                       this list. Better be SQL92-compliant.
+   * @return List of partition objects. FieldSchema is currently not populated.
+   */
+  private List<Partition> getPartitionsViaSqlFilterInternal(String dbName,
+      String tblName, String sqlFilter, List<String> paramsForFilter,
+      List<String> joinsForFilter) throws MetaException {
+    boolean doTrace = LOG.isDebugEnabled();
+    // Get all simple fields for partitions and related objects, which we can map one-on-one.
+    // We will do this in 2 queries to use different existing indices for each one.
+    // We do not get table and DB name, assuming they are the same as we are using to filter.
+    // TODO: We might want to tune the indexes instead. With current ones MySQL performs
+    // poorly, esp. with 'order by' w/o index on large tables, even if the number of actual
+    // results is small (query that returns 8 out of 32k partitions can go 4sec. to 0sec. by
+    // just adding a PART_ID IN (...) filter that doesn't alter the results to it, probably
+    // causing it to not sort the entire table due to not knowing how selective the filter is.
+    String queryText =
+        "select PARTITIONS.PART_ID from PARTITIONS"
+      + "  inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID "
+      + "  inner join DBS on TBLS.DB_ID = DBS.DB_ID "
+      + join(joinsForFilter, ' ') + " where TBLS.TBL_NAME = ? and DBS.NAME = ?"
+      + ((sqlFilter == null) ? "" : " " + sqlFilter);
+    Object[] params = new Object[paramsForFilter.size() + 2];
+    params[0] = tblName;
+    params[1] = dbName;
+    for (int i = 0; i < paramsForFilter.size(); ++i) {
+      params[i + 2] = paramsForFilter.get(i);
+    }
+
+    long start = doTrace ? System.nanoTime() : 0;
+    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+    @SuppressWarnings("unchecked")
+    List<Object> sqlResult = (List<Object>)query.executeWithArray(params);
+    if (sqlResult.isEmpty()) {
+      return new ArrayList<Partition>(); // no partitions, bail early.
+    }
+    long queryTime = doTrace ? System.nanoTime() : 0;
+
+    // Prepare StringBuilder for "PART_ID in (...)" to use in future queries.
+    int sbCapacity = sqlResult.size() * 7; // if there are 100k things => 6 chars, plus comma
+    StringBuilder partSb = new StringBuilder(sbCapacity);
+    // Assume db and table names are the same for all partition, that's what we're selecting for.
+    for (Object partitionId : sqlResult) {
+      partSb.append((Long)partitionId).append(",");
+    }
+    String partIds = trimCommaList(partSb);
+    if (doTrace) {
+      LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+          (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [ " + queryText + "]");
+    }
+
+    // Now get most of the other fields.
+    queryText =
+      "select PARTITIONS.PART_ID, SDS.SD_ID, SDS.CD_ID, SERDES.SERDE_ID, "
+    + "  PARTITIONS.CREATE_TIME, PARTITIONS.LAST_ACCESS_TIME, SDS.INPUT_FORMAT, "
+    + "  SDS.IS_COMPRESSED, SDS.IS_STOREDASSUBDIRECTORIES, SDS.LOCATION,  SDS.NUM_BUCKETS, "
+    + "  SDS.OUTPUT_FORMAT, SERDES.NAME, SERDES.SLIB "
+    + "from PARTITIONS"
+    + "  left outer join SDS on PARTITIONS.SD_ID = SDS.SD_ID "
+    + "  left outer join SERDES on SDS.SERDE_ID = SERDES.SERDE_ID "
+    + "where PART_ID in (" + partIds + ") order by PART_NAME asc";
+    start = doTrace ? System.nanoTime() : 0;
+    query = pm.newQuery("javax.jdo.query.SQL", queryText);
+    @SuppressWarnings("unchecked")
+    List<Object[]> sqlResult2 = (List<Object[]>)query.executeWithArray(params);
+    queryTime = doTrace ? System.nanoTime() : 0;
+
+    // Read all the fields and create partitions, SDs and serdes.
+    TreeMap<Long, Partition> partitions = new TreeMap<Long, Partition>();
+    TreeMap<Long, StorageDescriptor> sds = new TreeMap<Long, StorageDescriptor>();
+    TreeMap<Long, SerDeInfo> serdes = new TreeMap<Long, SerDeInfo>();
+    TreeMap<Long, List<FieldSchema>> colss = new TreeMap<Long, List<FieldSchema>>();
+    // Keep order by name, consistent with JDO.
+    ArrayList<Partition> orderedResult = new ArrayList<Partition>(sqlResult.size());
+
+    // Prepare StringBuilder-s for "in (...)" lists to use in one-to-many queries.
+    StringBuilder sdSb = new StringBuilder(sbCapacity), serdeSb = new StringBuilder(sbCapacity);
+    StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema.
+    tblName = tblName.toLowerCase();
+    dbName = dbName.toLowerCase();
+    for (Object[] fields : sqlResult2) {
+      // Here comes the ugly part...
+      long partitionId = (Long)fields[0];
+      Long sdId = (Long)fields[1];
+      Long colId = (Long)fields[2];
+      Long serdeId = (Long)fields[3];
+      if (sdId == null || colId == null || serdeId == null) {
+        throw new MetaException("Unexpected null for one of the IDs, SD " + sdId
+            + ", column " + colId + ", serde " + serdeId);
+      }
+
+      Partition part = new Partition();
+      orderedResult.add(part);
+      // Set the collection fields; some code might not check presence before accessing them.
+      part.setParameters(new HashMap<String, String>());
+      part.setValues(new ArrayList<String>());
+      part.setDbName(dbName);
+      part.setTableName(tblName);
+      if (fields[4] != null) part.setCreateTime((Integer)fields[4]);
+      if (fields[5] != null) part.setLastAccessTime((Integer)fields[5]);
+      partitions.put(partitionId, part);
+
+      // We assume each partition has an unique SD.
+      StorageDescriptor sd = new StorageDescriptor();
+      StorageDescriptor oldSd = sds.put(sdId, sd);
+      if (oldSd != null) {
+        throw new MetaException("Partitions reuse SDs; we don't expect that");
+      }
+      // Set the collection fields; some code might not check presence before accessing them.
+      sd.setSortCols(new ArrayList<Order>());
+      sd.setBucketCols(new ArrayList<String>());
+      sd.setParameters(new HashMap<String, String>());
+      sd.setSkewedInfo(new SkewedInfo(new ArrayList<String>(),
+          new ArrayList<List<String>>(), new HashMap<SkewedValueList, String>()));
+      sd.setInputFormat((String)fields[6]);
+      Boolean tmpBoolean = extractSqlBoolean(fields[7]);
+      if (tmpBoolean != null) sd.setCompressed(tmpBoolean);
+      tmpBoolean = extractSqlBoolean(fields[8]);
+      if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean);
+      sd.setLocation((String)fields[9]);
+      if (fields[10] != null) sd.setNumBuckets((Integer)fields[10]);
+      sd.setOutputFormat((String)fields[11]);
+      sdSb.append(sdId).append(",");
+      part.setSd(sd);
+
+      List<FieldSchema> cols = colss.get(colId);
+      // We expect that colId will be the same for all (or many) SDs.
+      if (cols == null) {
+        cols = new ArrayList<FieldSchema>();
+        colss.put(colId, cols);
+        colsSb.append(colId).append(",");
+      }
+      sd.setCols(cols);
+
+      // We assume each SD has an unique serde.
+      SerDeInfo serde = new SerDeInfo();
+      SerDeInfo oldSerde = serdes.put(serdeId, serde);
+      if (oldSerde != null) {
+        throw new MetaException("SDs reuse serdes; we don't expect that");
+      }
+      serde.setParameters(new HashMap<String, String>());
+      serde.setName((String)fields[12]);
+      serde.setSerializationLib((String)fields[13]);
+      serdeSb.append(serdeId).append(",");
+      sd.setSerdeInfo(serde);
+    }
+    query.closeAll();
+    if (doTrace) {
+      LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+          (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [ " + queryText + "]");
+    }
+
+    // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
+    String sdIds = trimCommaList(sdSb), serdeIds = trimCommaList(serdeSb),
+        colIds = trimCommaList(colsSb);
+
+    // Now get all the one-to-many things. Start with partitions.
+    queryText = "select PART_ID, PARAM_KEY, PARAM_VALUE from PARTITION_PARAMS where PART_ID in ("
+        + partIds + ") and PARAM_KEY is not null order by PART_ID asc";
+    loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
+      public void apply(Partition t, Object[] fields) {
+        t.putToParameters((String)fields[1], (String)fields[2]);
+      }});
+
+    queryText = "select PART_ID, PART_KEY_VAL from PARTITION_KEY_VALS where PART_ID in ("
+        + partIds + ") and INTEGER_IDX >= 0 order by PART_ID asc, INTEGER_IDX asc";
+    loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
+      public void apply(Partition t, Object[] fields) {
+        t.addToValues((String)fields[1]);
+      }});
+
+    // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
+    queryText = "select SD_ID, PARAM_KEY, PARAM_VALUE from SD_PARAMS where SD_ID in ("
+        + sdIds + ") and PARAM_KEY is not null order by SD_ID asc";
+    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      public void apply(StorageDescriptor t, Object[] fields) {
+        t.putToParameters((String)fields[1], (String)fields[2]);
+      }});
+
+    // Note that SORT_COLS has "ORDER" column, which is not SQL92-legal. We have two choices
+    // here - drop SQL92, or get '*' and be broken on certain schema changes. We do the latter.
+    queryText = "select SD_ID, COLUMN_NAME, SORT_COLS.* from SORT_COLS where SD_ID in ("
+        + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      public void apply(StorageDescriptor t, Object[] fields) {
+        if (fields[4] == null) return;
+        t.addToSortCols(new Order((String)fields[1], (Integer)fields[4]));
+      }});
+
+    queryText = "select SD_ID, BUCKET_COL_NAME from BUCKETING_COLS where SD_ID in ("
+        + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      public void apply(StorageDescriptor t, Object[] fields) {
+        t.addToBucketCols((String)fields[1]);
+      }});
+
+    // Skewed columns stuff.
+    queryText = "select SD_ID, SKEWED_COL_NAME from SKEWED_COL_NAMES where SD_ID in ("
+        + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+    boolean hasSkewedColumns =
+      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+        public void apply(StorageDescriptor t, Object[] fields) {
+          if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+          t.getSkewedInfo().addToSkewedColNames((String)fields[1]);
+        }}) > 0;
+
+    // Assume we don't need to fetch the rest of the skewed column data if we have no columns.
+    if (hasSkewedColumns) {
+      // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
+      queryText =
+            "select SKEWED_VALUES.SD_ID_OID, SKEWED_STRING_LIST_VALUES.STRING_LIST_ID, "
+          + "  SKEWED_STRING_LIST_VALUES.STRING_LIST_VALUE "
+          + "from SKEWED_VALUES "
+          + "  left outer join SKEWED_STRING_LIST_VALUES on "
+          + "    SKEWED_VALUES.STRING_LIST_ID_EID = SKEWED_STRING_LIST_VALUES.STRING_LIST_ID "
+          + "where SKEWED_VALUES.SD_ID_OID in (" + sdIds + ") "
+          + "  and SKEWED_VALUES.STRING_LIST_ID_EID is not null "
+          + "  and SKEWED_VALUES.INTEGER_IDX >= 0 "
+          + "order by SKEWED_VALUES.SD_ID_OID asc, SKEWED_VALUES.INTEGER_IDX asc, "
+          + "  SKEWED_STRING_LIST_VALUES.INTEGER_IDX asc";
+      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+        private Long currentListId;
+        private List<String> currentList;
+        public void apply(StorageDescriptor t, Object[] fields) {
+          if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+          // Note that this is not a typical list accumulator - there's no call to finalize
+          // the last list. Instead we add list to SD first, as well as locally to add elements.
+          if (fields[1] == null) {
+            currentList = null; // left outer join produced a list with no values
+            currentListId = null;
+            t.getSkewedInfo().addToSkewedColValues(new ArrayList<String>());
+          } else {
+            long fieldsListId = (Long)fields[1];
+            if (currentListId == null || fieldsListId != currentListId) {
+              currentList = new ArrayList<String>();
+              currentListId = fieldsListId;
+              t.getSkewedInfo().addToSkewedColValues(currentList);
+            }
+            currentList.add((String)fields[2]);
+          }
+        }});
+
+      // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
+      queryText =
+            "select SKEWED_COL_VALUE_LOC_MAP.SD_ID, SKEWED_STRING_LIST_VALUES.STRING_LIST_ID,"
+          + "  SKEWED_COL_VALUE_LOC_MAP.LOCATION, SKEWED_STRING_LIST_VALUES.STRING_LIST_VALUE "
+          + "from SKEWED_COL_VALUE_LOC_MAP"
+          + "  left outer join SKEWED_STRING_LIST_VALUES on SKEWED_COL_VALUE_LOC_MAP."
+          + "STRING_LIST_ID_KID = SKEWED_STRING_LIST_VALUES.STRING_LIST_ID "
+          + "where SKEWED_COL_VALUE_LOC_MAP.SD_ID in (" + sdIds + ")"
+          + "  and SKEWED_COL_VALUE_LOC_MAP.STRING_LIST_ID_KID is not null "
+          + "order by SKEWED_COL_VALUE_LOC_MAP.SD_ID asc,"
+          + "  SKEWED_STRING_LIST_VALUES.INTEGER_IDX asc";
+      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+        private Long currentListId;
+        private SkewedValueList currentList;
+        public void apply(StorageDescriptor t, Object[] fields) {
+          if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+          // Note that this is not a typical list accumulator - there's no call to finalize
+          // the last list. Instead we add list to SD first, as well as locally to add elements.
+          if (fields[1] == null) {
+            currentList = null; // left outer join produced a list with no values
+            currentListId = null;
+            t.getSkewedInfo().putToSkewedColValueLocationMaps(
+                new SkewedValueList(), (String)fields[2]);
+          } else {
+            long fieldsListId = (Long)fields[1];
+            if (currentListId == null || fieldsListId != currentListId) {
+              currentList = new SkewedValueList();
+              currentListId = fieldsListId;
+              t.getSkewedInfo().putToSkewedColValueLocationMaps(currentList, (String)fields[2]);
+            }
+            currentList.addToSkewedValueList((String)fields[3]);
+          }
+        }});
+    } // if (hasSkewedColumns)
+
+    // Get FieldSchema stuff if any.
+    if (!colss.isEmpty()) {
+      // We are skipping the CDS table here, as it seems to be totally useless.
+      queryText = "select CD_ID, COMMENT, COLUMN_NAME, TYPE_NAME from COLUMNS_V2 where CD_ID in ("
+          + colIds + ") and INTEGER_IDX >= 0 order by CD_ID asc, INTEGER_IDX asc";
+      loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
+        public void apply(List<FieldSchema> t, Object[] fields) {
+          t.add(new FieldSchema((String)fields[2], (String)fields[3], (String)fields[1]));
+        }});
+    }
+
+    // Finally, get all the stuff for serdes - just the params.
+    queryText = "select SERDE_ID, PARAM_KEY, PARAM_VALUE from SERDE_PARAMS where SERDE_ID in ("
+        + serdeIds + ") and PARAM_KEY is not null order by SERDE_ID asc";
+    loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
+      public void apply(SerDeInfo t, Object[] fields) {
+        t.putToParameters((String)fields[1], (String)fields[2]);
+      }});
+
+    return orderedResult;
+  }
+
+  private static Boolean extractSqlBoolean(Object value) throws MetaException {
+    // MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping. People using derby probably
+    // don't care about performance anyway, but let's cover the common case.
+    if (value == null) return null;
+    if (value instanceof Boolean) return (Boolean)value;
+    Character c = null;
+    if (value instanceof String && ((String)value).length() == 1) {
+      c = ((String)value).charAt(0);
+    }
+    if (c == 'Y') return true;
+    if (c == 'N') return false;
+    throw new MetaException("Cannot extrace boolean from column value " + value);
+  }
+
+  private static String trimCommaList(StringBuilder sb) {
+    if (sb.length() > 0) {
+      sb.setLength(sb.length() - 1);
+    }
+    return sb.toString();
+  }
+
+  private abstract class ApplyFunc<Target> {
+    public abstract void apply(Target t, Object[] fields);
+  }
+
+  /**
+   * Merges applies the result of a PM SQL query into a tree of object.
+   * Essentially it's an object join. DN could do this for us, but it issues queries
+   * separately for every object, which is suboptimal.
+   * @param tree The object tree, by ID.
+   * @param queryText The query text.
+   * @param keyIndex Index of the Long column corresponding to the map ID in query result rows.
+   * @param func The function that is called on each (object,row) pair with the same id.
+   * @return the count of results returned from the query.
+   */
+  private <T> int loopJoinOrderedResult(TreeMap<Long, T> tree,
+      String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException {
+    boolean doTrace = LOG.isDebugEnabled();
+    long start = doTrace ? System.nanoTime() : 0;
+    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+    Object result = query.execute();
+    long queryTime = doTrace ? System.nanoTime() : 0;
+    if (result == null) {
+      query.closeAll();
+      return 0;
+    }
+    if (!(result instanceof List<?>)) {
+      throw new MetaException("Wrong result type " + result.getClass());
+    }
+    @SuppressWarnings("unchecked")
+    List<Object[]> list = (List<Object[]>)result;
+    Iterator<Object[]> iter = list.iterator();
+    Object[] fields = null;
+    for (Map.Entry<Long, T> entry : tree.entrySet()) {
+      if (fields == null && !iter.hasNext()) break;
+      long id = entry.getKey();
+      while (fields != null || iter.hasNext()) {
+        if (fields == null) {
+          fields = iter.next();
+        }
+        long nestedId = (Long)fields[keyIndex];
+        if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId);
+        if (nestedId > id) break; // fields belong to one of the next entries
+        func.apply(entry.getValue(), fields);
+        fields = null;
+      }
+    }
+    int rv = list.size();
+    query.closeAll();
+    if (doTrace) {
+      LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+          (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
+    }
+    return rv;
+  }
+
+  private static class PartitionFilterGenerator implements TreeVisitor {
+    private final Table table;
+    private final StringBuilder filterBuffer;
+    private final List<String> params;
+    private final List<String> joins;
+
+    private PartitionFilterGenerator(Table table, List<String> params, List<String> joins) {
+      this.table = table;
+      this.params = params;
+      this.joins = joins;
+      this.filterBuffer = new StringBuilder();
+    }
+
+    /**
+     * Generate the ANSI SQL92 filter for the given expression tree
+     * @param table the table being queried
+     * @param params the ordered parameters for the resulting expression
+     * @param joins the joins necessary for the resulting expression
+     * @return the string representation of the expression tree
+     */
+    public static String generateSqlFilter(Table table,
+        ExpressionTree tree, List<String> params, List<String> joins) throws MetaException {
+      assert table != null;
+      if (tree.getRoot() == null) {
+        return "";
+      }
+      PartitionFilterGenerator visitor = new PartitionFilterGenerator(table, params, joins);
+      tree.getRoot().accept(visitor);
+      // Some joins might be null (see processNode for LeafNode), clean them up.
+      for (int i = 0; i < joins.size(); ++i) {
+        if (joins.get(i) != null) continue;
+        joins.remove(i--);
+      }
+      return "and (" + visitor.filterBuffer.toString() + ")";
+    }
+
+    @Override
+    public void visit(TreeNode node) throws MetaException {
+      assert node != null && node.getLhs() != null && node.getRhs() != null;
+      filterBuffer.append (" (");
+      node.getLhs().accept(this);
+      filterBuffer.append((node.getAndOr() == LogicalOperator.AND) ? " and " : " or ");
+      node.getRhs().accept(this);
+      filterBuffer.append (") ");
+    }
+
+    @Override
+    public void visit(LeafNode node) throws MetaException {
+      if (node.operator == Operator.LIKE) {
+        // ANSI92 supports || for concatenation (we need to concat '%'-s to the parameter),
+        // but it doesn't work on all RDBMSes, e.g. on MySQL by default. So don't use it for now.
+        throw new MetaException("LIKE is not supported for SQL filter pushdown");
+      }
+      int partColCount = table.getPartitionKeys().size();
+      int partColIndex = node.getPartColIndexForFilter(table);
+
+      String valueAsString = node.getFilterPushdownParam(table, partColIndex);
+      // Add parameters linearly; we are traversing leaf nodes LTR, so they would match correctly.
+      params.add(valueAsString);
+
+      if (joins.isEmpty()) {
+        // There's a fixed number of partition cols that we might have filters on. To avoid
+        // joining multiple times for one column (if there are several filters on it), we will
+        // keep numCols elements in the list, one for each column; we will fill it with nulls,
+        // put each join at a corresponding index when necessary, and remove nulls in the end.
+        for (int i = 0; i < partColCount; ++i) {
+          joins.add(null);
+        }
+      }
+      if (joins.get(partColIndex) == null) {
+        joins.set(partColIndex, "inner join PARTITION_KEY_VALS as FILTER" + partColIndex
+            + " on FILTER"  + partColIndex + ".PART_ID = PARTITIONS.PART_ID and FILTER"
+            + partColIndex + ".INTEGER_IDX = " + partColIndex);
+      }
+
+      String tableValue = "FILTER" + partColIndex + ".PART_KEY_VAL";
+      // TODO: need casts here if #doesOperatorSupportIntegral is amended to include lt/gt/etc.
+      filterBuffer.append(node.isReverseOrder
+          ? "(? " + node.operator.getSqlOp() + " " + tableValue + ")"
+          : "(" + tableValue + " " + node.operator.getSqlOp() + " ?)");
+    }
+  }
+}

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1511177&r1=1511176&r2=1511177&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Wed Aug  7 05:30:01 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.commons.lang.StringUtils.repeat;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -33,6 +34,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -157,6 +159,7 @@ public class ObjectStore implements RawS
 
   private boolean isInitialized = false;
   private PersistenceManager pm = null;
+  private MetaStoreDirectSql directSql = null;
   private Configuration hiveConf;
   int openTrasactionCalls = 0;
   private Transaction currentTransaction = null;
@@ -196,6 +199,7 @@ public class ObjectStore implements RawS
       // Always want to re-create pm as we don't know if it were created by the
       // most recent instance of the pmf
       pm = null;
+      directSql = null;
       openTrasactionCalls = 0;
       currentTransaction = null;
       transactionStatus = TXN_STATUS.NO_STATE;
@@ -227,6 +231,9 @@ public class ObjectStore implements RawS
     prop = dsProps;
     pm = getPersistenceManager();
     isInitialized = pm != null;
+    if (isInitialized) {
+      directSql = new MetaStoreDirectSql(pm);
+    }
     return;
   }
 
@@ -1652,43 +1659,34 @@ public class ObjectStore implements RawS
   @Override
   public List<Partition> getPartitionsByNames(String dbName, String tblName,
       List<String> partNames) throws MetaException, NoSuchObjectException {
+    boolean doTrace = LOG.isDebugEnabled();
+    List<Partition> results = null;
+    boolean doUseDirectSql = HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
 
     boolean success = false;
     try {
+      long start = doTrace ? System.nanoTime() : 0;
       openTransaction();
-
-      StringBuilder sb = new StringBuilder(
-          "table.tableName == t1 && table.database.name == t2 && (");
-      int n = 0;
-      Map<String, String> params = new HashMap<String, String>();
-      for (Iterator<String> itr = partNames.iterator(); itr.hasNext();) {
-        String pn = "p" + n;
-        n++;
-        String part = itr.next();
-        params.put(pn, part);
-        sb.append("partitionName == ").append(pn);
-        sb.append(" || ");
+      if (doUseDirectSql) {
+        try {
+          results = directSql.getPartitionsViaSqlFilter(dbName, tblName, partNames);
+        } catch (Exception ex) {
+          LOG.error("Direct SQL failed, falling back to ORM", ex);
+          doUseDirectSql = false;
+          rollbackTransaction();
+          start = doTrace ? System.nanoTime() : 0;
+          openTransaction();
+        }
       }
-      sb.setLength(sb.length() - 4); // remove the last " || "
-      sb.append(')');
-
-      Query query = pm.newQuery(MPartition.class, sb.toString());
-
-      LOG.debug(" JDOQL filter is " + sb.toString());
-
-      params.put("t1", tblName.trim());
-      params.put("t2", dbName.trim());
-
-      String parameterDeclaration = makeParameterDeclarationString(params);
-      query.declareParameters(parameterDeclaration);
-      query.setOrdering("partitionName ascending");
 
-      List<MPartition> mparts = (List<MPartition>) query.executeWithMap(params);
-      // pm.retrieveAll(mparts); // retrieveAll is pessimistic. some fields may not be needed
-      List<Partition> results = convertToParts(dbName, tblName, mparts);
-      // pm.makeTransientAll(mparts); // makeTransient will prohibit future access of unfetched fields
-      query.closeAll();
+      if (!doUseDirectSql) {
+        results = getPartitionsViaOrm(dbName, tblName, partNames);
+      }
       success = commitTransaction();
+      if (doTrace) {
+        LOG.debug(results.size() + " partition retrieved using " + (doUseDirectSql ? "SQL" : "ORM")
+            + " in " + ((System.nanoTime() - start) / 1000000.0) + "ms");
+      }
       return results;
     } finally {
       if (!success) {
@@ -1697,15 +1695,98 @@ public class ObjectStore implements RawS
     }
   }
 
+  private List<Partition> getPartitionsViaOrm(
+      String dbName, String tblName, List<String> partNames) throws MetaException {
+    StringBuilder sb = new StringBuilder(
+        "table.tableName == t1 && table.database.name == t2 && (");
+    int n = 0;
+    Map<String, String> params = new HashMap<String, String>();
+    for (Iterator<String> itr = partNames.iterator(); itr.hasNext();) {
+      String pn = "p" + n;
+      n++;
+      String part = itr.next();
+      params.put(pn, part);
+      sb.append("partitionName == ").append(pn);
+      sb.append(" || ");
+    }
+    sb.setLength(sb.length() - 4); // remove the last " || "
+    sb.append(')');
+
+    Query query = pm.newQuery(MPartition.class, sb.toString());
+
+    LOG.debug(" JDOQL filter is " + sb.toString());
+    params.put("t1", tblName.trim());
+    params.put("t2", dbName.trim());
+
+    String parameterDeclaration = makeParameterDeclarationString(params);
+
+    query.declareParameters(parameterDeclaration);
+    query.setOrdering("partitionName ascending");
+
+    List<MPartition> mparts = (List<MPartition>) query.executeWithMap(params);
+    // pm.retrieveAll(mparts); // retrieveAll is pessimistic. some fields may not be needed
+    List<Partition> results = convertToParts(dbName, tblName, mparts);
+    // pm.makeTransientAll(mparts); // makeTransient will prohibit future access of unfetched fields
+    query.closeAll();
+    return results;
+  }
+
   @Override
   public List<Partition> getPartitionsByFilter(String dbName, String tblName,
       String filter, short maxParts) throws MetaException, NoSuchObjectException {
-    openTransaction();
-    List<Partition> parts = convertToParts(listMPartitionsByFilter(dbName,
-        tblName, filter, maxParts));
-    LOG.info("# parts after pruning = " + parts.size());
-    commitTransaction();
-    return parts;
+    boolean doTrace = LOG.isDebugEnabled();
+    // There's no portable SQL limit. It doesn't make a lot of sense w/o offset anyway.
+    boolean doUseDirectSql = (maxParts < 0)
+        && HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
+    dbName = dbName.toLowerCase();
+    tblName = tblName.toLowerCase();
+    List<Partition> results = null;
+    FilterParser parser = null;
+    if (filter != null && filter.length() != 0) {
+      LOG.debug("Filter specified is " + filter);
+      parser = getFilterParser(filter);
+    }
+
+    boolean success = false;
+    try {
+      long start = doTrace ? System.nanoTime() : 0;
+      openTransaction();
+      MTable mtable = ensureGetMTable(dbName, tblName);
+      if (doUseDirectSql) {
+        try {
+          Table table = convertToTable(mtable);
+          results = directSql.getPartitionsViaSqlFilter(table, dbName, tblName, parser);
+        } catch (Exception ex) {
+          LOG.error("Direct SQL failed, falling back to ORM", ex);
+          doUseDirectSql = false;
+          rollbackTransaction();
+          start = doTrace ? System.nanoTime() : 0;
+          openTransaction();
+          mtable = ensureGetMTable(dbName, tblName); // Detached on rollback, get again.
+        }
+      }
+      if (!doUseDirectSql) {
+        results = convertToParts(listMPartitionsByFilterNoTxn(
+            mtable, dbName, tblName, parser, maxParts));
+      }
+      success = commitTransaction();
+      LOG.info(results.size() + " partitions retrieved using " + (doUseDirectSql ? "SQL" : "ORM")
+          + (doTrace ? (" in " + ((System.nanoTime() - start) / 1000000.0) + "ms") : ""));
+      return results;
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  private MTable ensureGetMTable(String dbName, String tblName) throws NoSuchObjectException {
+    MTable mtable = getMTable(dbName, tblName);
+    if (mtable == null) {
+      throw new NoSuchObjectException("Specified database/table does not exist : "
+          + dbName + "." + tblName);
+    }
+    return mtable;
   }
 
   private FilterParser getFilterParser(String filter) throws MetaException {
@@ -1736,6 +1817,18 @@ public class ObjectStore implements RawS
    * if mtable is null, generates the query to filter over tables in a database
    */
   private String makeQueryFilterString(MTable mtable, String filter,
+      Map<String, Object> params) throws MetaException {
+    FilterParser parser =
+        (filter != null && filter.length() != 0) ? getFilterParser(filter) : null;
+    return makeQueryFilterString(mtable, parser, params);
+  }
+
+  /**
+   * Makes a JDO query filter string
+   * if mtable is not null, generates the query to filter over partitions in a table.
+   * if mtable is null, generates the query to filter over tables in a database
+   */
+  private String makeQueryFilterString(MTable mtable, FilterParser parser,
       Map<String, Object> params)
       throws MetaException {
 
@@ -1746,10 +1839,8 @@ public class ObjectStore implements RawS
       queryBuilder.append("database.name == dbName");
     }
 
-    if (filter != null && filter.length() > 0) {
-      FilterParser parser = getFilterParser(filter);
+    if (parser != null) {
       String jdoFilter;
-
       if (mtable != null) {
         Table table = convertToTable(mtable);
         jdoFilter = parser.tree.generateJDOFilter(table, params);
@@ -1794,54 +1885,35 @@ public class ObjectStore implements RawS
     return paramDecl.toString();
   }
 
-  private List<MPartition> listMPartitionsByFilter(String dbName, String tableName,
-      String filter, short maxParts) throws MetaException, NoSuchObjectException{
-    boolean success = false;
+  private List<MPartition> listMPartitionsByFilterNoTxn(MTable mtable, String dbName,
+      String tableName, FilterParser parser, short maxParts)
+          throws MetaException, NoSuchObjectException {
     List<MPartition> mparts = null;
-    try {
-      openTransaction();
-      LOG.debug("Executing listMPartitionsByFilter");
-      dbName = dbName.toLowerCase();
-      tableName = tableName.toLowerCase();
+    LOG.debug("Executing listMPartitionsByFilterNoTxn");
+    Map<String, Object> params = new HashMap<String, Object>();
+    String queryFilterString = makeQueryFilterString(mtable, parser, params);
 
-      MTable mtable = getMTable(dbName, tableName);
-      if( mtable == null ) {
-        throw new NoSuchObjectException("Specified database/table does not exist : "
-            + dbName + "." + tableName);
-      }
-      Map<String, Object> params = new HashMap<String, Object>();
-      String queryFilterString =
-        makeQueryFilterString(mtable, filter, params);
-
-      Query query = pm.newQuery(MPartition.class,
-          queryFilterString);
+    Query query = pm.newQuery(MPartition.class,
+        queryFilterString);
 
-      if( maxParts >= 0 ) {
-        //User specified a row limit, set it on the Query
-        query.setRange(0, maxParts);
-      }
+    if( maxParts >= 0 ) {
+      //User specified a row limit, set it on the Query
+      query.setRange(0, maxParts);
+    }
 
-      LOG.debug("Filter specified is " + filter + "," +
-             " JDOQL filter is " + queryFilterString);
+    LOG.debug("JDOQL filter is " + queryFilterString);
 
-      params.put("t1", tableName.trim());
-      params.put("t2", dbName.trim());
+    params.put("t1", tableName.trim());
+    params.put("t2", dbName.trim());
 
-      String parameterDeclaration = makeParameterDeclarationStringObj(params);
-      query.declareParameters(parameterDeclaration);
-      query.setOrdering("partitionName ascending");
+    String parameterDeclaration = makeParameterDeclarationStringObj(params);
+    query.declareParameters(parameterDeclaration);
+    query.setOrdering("partitionName ascending");
 
-      mparts = (List<MPartition>) query.executeWithMap(params);
+    mparts = (List<MPartition>) query.executeWithMap(params);
 
-      LOG.debug("Done executing query for listMPartitionsByFilter");
-      pm.retrieveAll(mparts);
-      success = commitTransaction();
-      LOG.debug("Done retrieving all objects for listMPartitionsByFilter");
-    } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-    }
+    LOG.debug("Done executing query for listMPartitionsByFilterNoTxn");
+    pm.retrieveAll(mparts);
     return mparts;
   }
 
@@ -1909,8 +1981,7 @@ public class ObjectStore implements RawS
         return partNames;
       }
       Map<String, Object> params = new HashMap<String, Object>();
-      String queryFilterString =
-        makeQueryFilterString(mtable, filter, params);
+      String queryFilterString = makeQueryFilterString(mtable, filter, params);
       Query query = pm.newQuery(
           "select partitionName from org.apache.hadoop.hive.metastore.model.MPartition "
           + "where " + queryFilterString);

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java?rev=1511177&r1=1511176&r2=1511177&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java Wed Aug  7 05:30:01 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.metastore.parser;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.common.Fil
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 
 import com.google.common.collect.Sets;
@@ -46,27 +48,30 @@ public class ExpressionTree {
 
   /** The operators supported. */
   public enum Operator {
-    EQUALS  ("=", "=="),
+    EQUALS  ("=", "==", "="),
     GREATERTHAN  (">"),
     LESSTHAN  ("<"),
     LESSTHANOREQUALTO ("<="),
     GREATERTHANOREQUALTO (">="),
-    LIKE ("LIKE", "matches"),
-    NOTEQUALS2 ("!=", "!="),
-    NOTEQUALS ("<>", "!=");
+    LIKE ("LIKE", "matches", "like"),
+    NOTEQUALS2 ("!=", "!=", "<>"),
+    NOTEQUALS ("<>", "!=", "<>");
 
     private final String op;
     private final String jdoOp;
+    private final String sqlOp;
 
     // private constructor
     private Operator(String op){
       this.op = op;
       this.jdoOp = op;
+      this.sqlOp = op;
     }
 
-    private Operator(String op, String jdoOp){
+    private Operator(String op, String jdoOp, String sqlOp){
       this.op = op;
       this.jdoOp = jdoOp;
+      this.sqlOp = sqlOp;
     }
 
     public String getOp() {
@@ -77,6 +82,10 @@ public class ExpressionTree {
       return jdoOp;
     }
 
+    public String getSqlOp() {
+      return sqlOp;
+    }
+
     public static Operator fromString(String inputOperator) {
       for(Operator op : Operator.values()) {
         if(op.getOp().equals(inputOperator)){
@@ -95,6 +104,10 @@ public class ExpressionTree {
 
   }
 
+  public static interface TreeVisitor {
+    void visit(TreeNode node) throws MetaException;
+    void visit(LeafNode node) throws MetaException;
+  }
 
   /**
    * The Class representing a Node in the ExpressionTree.
@@ -113,6 +126,23 @@ public class ExpressionTree {
       this.rhs = rhs;
     }
 
+    public TreeNode getLhs() {
+      return lhs;
+    }
+
+    public LogicalOperator getAndOr() {
+      return andOr;
+    }
+
+    public TreeNode getRhs() {
+      return rhs;
+    }
+
+    /** Double dispatch for TreeVisitor. */
+    public void accept(TreeVisitor visitor) throws MetaException {
+      visitor.visit(this);
+    }
+
     /**
      * Generates a JDO filter statement
      * @param table
@@ -162,6 +192,11 @@ public class ExpressionTree {
     private static final String PARAM_PREFIX = "hive_filter_param_";
 
     @Override
+    public void accept(TreeVisitor visitor) throws MetaException {
+      visitor.visit(this);
+    }
+
+    @Override
     public String generateJDOFilter(Table table,
         Map<String, Object> params)
         throws MetaException {
@@ -238,50 +273,13 @@ public class ExpressionTree {
 
     private String generateJDOFilterOverPartitions(Table table, Map<String, Object> params)
     throws MetaException {
-
       int partitionColumnCount = table.getPartitionKeys().size();
-      int partitionColumnIndex;
-      for(partitionColumnIndex = 0;
-      partitionColumnIndex < partitionColumnCount;
-      partitionColumnIndex++ ) {
-        if( table.getPartitionKeys().get(partitionColumnIndex).getName().
-            equalsIgnoreCase(keyName)) {
-          break;
-        }
-      }
-      assert (table.getPartitionKeys().size() > 0);
-
-      if( partitionColumnIndex == table.getPartitionKeys().size() ) {
-        throw new MetaException("Specified key <" + keyName +
-            "> is not a partitioning key for the table");
-      }
-
-      String keyType = table.getPartitionKeys().get(partitionColumnIndex).getType();
-      boolean isIntegralSupported = doesOperatorSupportIntegral(operator);
-
-      // Can only support partitions whose types are string, or maybe integers
-      if (!keyType.equals(org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME)
-          && (!isIntegralSupported || !isIntegralType(keyType))) {
-        throw new MetaException("Filtering is supported only on partition keys of type " +
-            "string" + (isIntegralSupported ? ", or integral types" : ""));
-      }
-
-      boolean isStringValue = value instanceof String;
-      if (!isStringValue && (!isIntegralSupported || !(value instanceof Long))) {
-        throw new MetaException("Filtering is supported only on partition keys of type " +
-            "string" + (isIntegralSupported ? ", or integral types" : ""));
-      }
-
-      String valueAsString = null;
-      try {
-        valueAsString = isStringValue ? (String) value : Long.toString((Long) value);
-      } catch (ClassCastException e) {
-        throw new MetaException("Unable to cast the constexpr to "
-            + (isStringValue ? "string" : "long"));
-      }
+      int partitionColumnIndex = getPartColIndexForFilter(table);
 
+      String valueAsString = getFilterPushdownParam(table, partitionColumnIndex);
       String paramName = PARAM_PREFIX + params.size();
       params.put(paramName, valueAsString);
+
       boolean isOpEquals = operator == Operator.EQUALS;
       if (isOpEquals || operator == Operator.NOTEQUALS || operator == Operator.NOTEQUALS2) {
         return makeFilterForEquals(keyName, valueAsString, paramName, params,
@@ -320,6 +318,7 @@ public class ExpressionTree {
      * @return true iff filter pushdown for this operator can be done for integral types.
      */
     private static boolean doesOperatorSupportIntegral(Operator operator) {
+      // TODO: for SQL-based filtering, this could be amended if we added casts.
       return (operator == Operator.EQUALS)
           || (operator == Operator.NOTEQUALS)
           || (operator == Operator.NOTEQUALS2);
@@ -330,10 +329,61 @@ public class ExpressionTree {
      * @return true iff type is an integral type.
      */
     private static boolean isIntegralType(String type) {
-      return type.equals(org.apache.hadoop.hive.serde.serdeConstants.TINYINT_TYPE_NAME)
-          || type.equals(org.apache.hadoop.hive.serde.serdeConstants.SMALLINT_TYPE_NAME)
-          || type.equals(org.apache.hadoop.hive.serde.serdeConstants.INT_TYPE_NAME)
-          || type.equals(org.apache.hadoop.hive.serde.serdeConstants.BIGINT_TYPE_NAME);
+      return type.equals(serdeConstants.TINYINT_TYPE_NAME)
+          || type.equals(serdeConstants.SMALLINT_TYPE_NAME)
+          || type.equals(serdeConstants.INT_TYPE_NAME)
+          || type.equals(serdeConstants.BIGINT_TYPE_NAME);
+    }
+
+    /**
+     * Get partition column index in the table partition column list that
+     * corresponds to the key that is being filtered on by this tree node.
+     * @param table The table.
+     * @return The index.
+     */
+    public int getPartColIndexForFilter(Table table) throws MetaException {
+      int partitionColumnIndex;
+      assert (table.getPartitionKeys().size() > 0);
+      for (partitionColumnIndex = 0; partitionColumnIndex < table.getPartitionKeys().size();
+          ++partitionColumnIndex) {
+        if (table.getPartitionKeys().get(partitionColumnIndex).getName().
+            equalsIgnoreCase(keyName)) {
+          break;
+        }
+      }
+      if( partitionColumnIndex == table.getPartitionKeys().size() ) {
+        throw new MetaException("Specified key <" + keyName +
+            "> is not a partitioning key for the table");
+      }
+
+      return partitionColumnIndex;
+    }
+
+    /**
+     * Validates and gets the query parameter for filter pushdown based on the column
+     * and the constant stored in this node.
+     * In future this may become different for SQL and JDOQL filter pushdown.
+     * @param table The table.
+     * @param partColIndex The index of the column to check.
+     * @return The parameter string.
+     */
+    public String getFilterPushdownParam(Table table, int partColIndex) throws MetaException {
+      boolean isIntegralSupported = doesOperatorSupportIntegral(operator);
+      String colType = table.getPartitionKeys().get(partColIndex).getType();
+      // Can only support partitions whose types are string, or maybe integers
+      if (!colType.equals(serdeConstants.STRING_TYPE_NAME)
+          && (!isIntegralSupported || !isIntegralType(colType))) {
+        throw new MetaException("Filtering is supported only on partition keys of type " +
+            "string" + (isIntegralSupported ? ", or integral types" : ""));
+      }
+
+      boolean isStringValue = value instanceof String;
+      if (!isStringValue && (!isIntegralSupported || !(value instanceof Long))) {
+        throw new MetaException("Filtering is supported only on partition keys of type " +
+            "string" + (isIntegralSupported ? ", or integral types" : ""));
+      }
+
+      return isStringValue ? (String) value : Long.toString((Long) value);
     }
   }
 
@@ -404,6 +454,10 @@ public class ExpressionTree {
    */
   private final Stack<TreeNode> nodeStack = new Stack<TreeNode>();
 
+  public TreeNode getRoot() {
+    return this.root;
+  }
+
   /**
    * Adds a intermediate node of either type(AND/OR). Pops last two nodes from
    * the stack and sets them as children of the new node and pushes itself
@@ -447,6 +501,7 @@ public class ExpressionTree {
     return root.generateJDOFilter(table, params);
   }
 
+
   /** Case insensitive ANTLR string stream */
   public static class ANTLRNoCaseStringStream extends ANTLRStringStream {
     public ANTLRNoCaseStringStream (String input) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1511177&r1=1511176&r2=1511177&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Aug  7 05:30:01 2013
@@ -1814,6 +1814,7 @@ private void constructOneLBLocationMap(F
     List<Partition> partitions = new ArrayList<Partition>(partNames.size());
 
     int batchSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX);
+    // TODO: might want to increase the default batch size. 1024 is viable; MS gets OOM if too high.
     int nParts = partNames.size();
     int nBatches = nParts / batchSize;