You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/07 07:30:02 UTC
svn commit: r1511177 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
metastore/src/java/org/apache/hadoop/hive/metastore/
metastore/src/java/org/apache/hadoop/hive/metastore/parser/
ql/src/java/org/apache/hadoop/hive/ql/metadata/
Author: hashutosh
Date: Wed Aug 7 05:30:01 2013
New Revision: 1511177
URL: http://svn.apache.org/r1511177
Log:
HIVE-4051 : Hive's metastore suffers from 1+N queries when querying partitions & is slow (Sergey Shelukhin via Ashutosh Chauhan)
Added:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1511177&r1=1511176&r2=1511177&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Aug 7 05:30:01 2013
@@ -344,6 +344,7 @@ public class HiveConf extends Configurat
METASTORE_EXECUTE_SET_UGI("hive.metastore.execute.setugi", false),
METASTORE_PARTITION_NAME_WHITELIST_PATTERN(
"hive.metastore.partition.name.whitelist.pattern", ""),
+ METASTORE_TRY_DIRECT_SQL("hive.metastore.try.direct.sql", true),
METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES(
"hive.metastore.disallow.incompatible.col.type.changes", false),
Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java?rev=1511177&view=auto
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java (added)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java Wed Aug 7 05:30:01 2013
@@ -0,0 +1,564 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.commons.lang.StringUtils.repeat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
+import org.apache.hadoop.hive.metastore.parser.FilterParser;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
+
+/**
+ * This class contains the optimizations for MetaStore that rely on direct SQL access to
+ * the underlying database. It should use ANSI SQL and be compatible with common databases
+ * such as MySQL (note that MySQL doesn't use full ANSI mode by default), Postgres, etc.
+ *
+ * As of now, only the partition retrieval is done this way to improve job startup time;
+ * JDOQL partition retrieval is still present so as not to limit the ORM solution we have
+ * to SQL stores only. There's always a way to do without direct SQL.
+ */
+class MetaStoreDirectSql {
+ private static final Log LOG = LogFactory.getLog(MetaStoreDirectSql.class);
+
+ private final PersistenceManager pm;
+
+ public MetaStoreDirectSql(PersistenceManager pm) {
+ this.pm = pm;
+ }
+
+ /**
+ * Gets partitions by using direct SQL queries.
+ * @param dbName Metastore db name.
+ * @param tblName Metastore table name.
+ * @param partNames Partition names to get.
+ * @return List of partitions.
+ */
+ public List<Partition> getPartitionsViaSqlFilter(
+ String dbName, String tblName, List<String> partNames) throws MetaException {
+ String list = repeat(",?", partNames.size()).substring(1);
+ return getPartitionsViaSqlFilterInternal(dbName, tblName,
+ "and PARTITIONS.PART_NAME in (" + list + ")" , partNames, new ArrayList<String>());
+ }
+
+ /**
+ * Gets partitions by using direct SQL queries.
+ * @param dbName Metastore db name.
+ * @param tblName Metastore table name.
+ * @param parser The parsed filter from which the SQL filter will be generated.
+ * @return List of partitions.
+ */
+ public List<Partition> getPartitionsViaSqlFilter(Table table, String dbName,
+ String tblName, FilterParser parser) throws MetaException {
+ List<String> params = new ArrayList<String>(), joins = new ArrayList<String>();
+ String sqlFilter = (parser == null) ? null
+ : PartitionFilterGenerator.generateSqlFilter(table, parser.tree, params, joins);
+ return getPartitionsViaSqlFilterInternal(dbName, tblName, sqlFilter, params, joins);
+ }
+
+ /**
+ * Get partition objects for the query using direct SQL queries, to avoid bazillion
+ * queries created by DN retrieving stuff for each object individually.
+ * @param dbName Metastore db name.
+ * @param tblName Metastore table name.
+ * @param sqlFilter SQL filter to use. Better be SQL92-compliant. Can be null.
+ * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order.
+ * @param joinsForFilter if the filter needs additional join statement, they must be in
+ * this list. Better be SQL92-compliant.
+ * @return List of partition objects. FieldSchema is currently not populated.
+ */
+ private List<Partition> getPartitionsViaSqlFilterInternal(String dbName,
+ String tblName, String sqlFilter, List<String> paramsForFilter,
+ List<String> joinsForFilter) throws MetaException {
+ boolean doTrace = LOG.isDebugEnabled();
+ // Get all simple fields for partitions and related objects, which we can map one-on-one.
+ // We will do this in 2 queries to use different existing indices for each one.
+ // We do not get table and DB name, assuming they are the same as we are using to filter.
+ // TODO: We might want to tune the indexes instead. With current ones MySQL performs
+ // poorly, esp. with 'order by' w/o index on large tables, even if the number of actual
+ // results is small (query that returns 8 out of 32k partitions can go 4sec. to 0sec. by
+ // just adding a PART_ID IN (...) filter that doesn't alter the results to it, probably
+ // causing it to not sort the entire table due to not knowing how selective the filter is.
+ String queryText =
+ "select PARTITIONS.PART_ID from PARTITIONS"
+ + " inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID "
+ + " inner join DBS on TBLS.DB_ID = DBS.DB_ID "
+ + join(joinsForFilter, ' ') + " where TBLS.TBL_NAME = ? and DBS.NAME = ?"
+ + ((sqlFilter == null) ? "" : " " + sqlFilter);
+ Object[] params = new Object[paramsForFilter.size() + 2];
+ params[0] = tblName;
+ params[1] = dbName;
+ for (int i = 0; i < paramsForFilter.size(); ++i) {
+ params[i + 2] = paramsForFilter.get(i);
+ }
+
+ long start = doTrace ? System.nanoTime() : 0;
+ Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ @SuppressWarnings("unchecked")
+ List<Object> sqlResult = (List<Object>)query.executeWithArray(params);
+ if (sqlResult.isEmpty()) {
+ return new ArrayList<Partition>(); // no partitions, bail early.
+ }
+ long queryTime = doTrace ? System.nanoTime() : 0;
+
+ // Prepare StringBuilder for "PART_ID in (...)" to use in future queries.
+ int sbCapacity = sqlResult.size() * 7; // if there are 100k things => 6 chars, plus comma
+ StringBuilder partSb = new StringBuilder(sbCapacity);
+ // Assume db and table names are the same for all partition, that's what we're selecting for.
+ for (Object partitionId : sqlResult) {
+ partSb.append((Long)partitionId).append(",");
+ }
+ String partIds = trimCommaList(partSb);
+ if (doTrace) {
+ LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+ (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [ " + queryText + "]");
+ }
+
+ // Now get most of the other fields.
+ queryText =
+ "select PARTITIONS.PART_ID, SDS.SD_ID, SDS.CD_ID, SERDES.SERDE_ID, "
+ + " PARTITIONS.CREATE_TIME, PARTITIONS.LAST_ACCESS_TIME, SDS.INPUT_FORMAT, "
+ + " SDS.IS_COMPRESSED, SDS.IS_STOREDASSUBDIRECTORIES, SDS.LOCATION, SDS.NUM_BUCKETS, "
+ + " SDS.OUTPUT_FORMAT, SERDES.NAME, SERDES.SLIB "
+ + "from PARTITIONS"
+ + " left outer join SDS on PARTITIONS.SD_ID = SDS.SD_ID "
+ + " left outer join SERDES on SDS.SERDE_ID = SERDES.SERDE_ID "
+ + "where PART_ID in (" + partIds + ") order by PART_NAME asc";
+ start = doTrace ? System.nanoTime() : 0;
+ query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ @SuppressWarnings("unchecked")
+ List<Object[]> sqlResult2 = (List<Object[]>)query.executeWithArray(params);
+ queryTime = doTrace ? System.nanoTime() : 0;
+
+ // Read all the fields and create partitions, SDs and serdes.
+ TreeMap<Long, Partition> partitions = new TreeMap<Long, Partition>();
+ TreeMap<Long, StorageDescriptor> sds = new TreeMap<Long, StorageDescriptor>();
+ TreeMap<Long, SerDeInfo> serdes = new TreeMap<Long, SerDeInfo>();
+ TreeMap<Long, List<FieldSchema>> colss = new TreeMap<Long, List<FieldSchema>>();
+ // Keep order by name, consistent with JDO.
+ ArrayList<Partition> orderedResult = new ArrayList<Partition>(sqlResult.size());
+
+ // Prepare StringBuilder-s for "in (...)" lists to use in one-to-many queries.
+ StringBuilder sdSb = new StringBuilder(sbCapacity), serdeSb = new StringBuilder(sbCapacity);
+ StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema.
+ tblName = tblName.toLowerCase();
+ dbName = dbName.toLowerCase();
+ for (Object[] fields : sqlResult2) {
+ // Here comes the ugly part...
+ long partitionId = (Long)fields[0];
+ Long sdId = (Long)fields[1];
+ Long colId = (Long)fields[2];
+ Long serdeId = (Long)fields[3];
+ if (sdId == null || colId == null || serdeId == null) {
+ throw new MetaException("Unexpected null for one of the IDs, SD " + sdId
+ + ", column " + colId + ", serde " + serdeId);
+ }
+
+ Partition part = new Partition();
+ orderedResult.add(part);
+ // Set the collection fields; some code might not check presence before accessing them.
+ part.setParameters(new HashMap<String, String>());
+ part.setValues(new ArrayList<String>());
+ part.setDbName(dbName);
+ part.setTableName(tblName);
+ if (fields[4] != null) part.setCreateTime((Integer)fields[4]);
+ if (fields[5] != null) part.setLastAccessTime((Integer)fields[5]);
+ partitions.put(partitionId, part);
+
+ // We assume each partition has an unique SD.
+ StorageDescriptor sd = new StorageDescriptor();
+ StorageDescriptor oldSd = sds.put(sdId, sd);
+ if (oldSd != null) {
+ throw new MetaException("Partitions reuse SDs; we don't expect that");
+ }
+ // Set the collection fields; some code might not check presence before accessing them.
+ sd.setSortCols(new ArrayList<Order>());
+ sd.setBucketCols(new ArrayList<String>());
+ sd.setParameters(new HashMap<String, String>());
+ sd.setSkewedInfo(new SkewedInfo(new ArrayList<String>(),
+ new ArrayList<List<String>>(), new HashMap<SkewedValueList, String>()));
+ sd.setInputFormat((String)fields[6]);
+ Boolean tmpBoolean = extractSqlBoolean(fields[7]);
+ if (tmpBoolean != null) sd.setCompressed(tmpBoolean);
+ tmpBoolean = extractSqlBoolean(fields[8]);
+ if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean);
+ sd.setLocation((String)fields[9]);
+ if (fields[10] != null) sd.setNumBuckets((Integer)fields[10]);
+ sd.setOutputFormat((String)fields[11]);
+ sdSb.append(sdId).append(",");
+ part.setSd(sd);
+
+ List<FieldSchema> cols = colss.get(colId);
+ // We expect that colId will be the same for all (or many) SDs.
+ if (cols == null) {
+ cols = new ArrayList<FieldSchema>();
+ colss.put(colId, cols);
+ colsSb.append(colId).append(",");
+ }
+ sd.setCols(cols);
+
+ // We assume each SD has an unique serde.
+ SerDeInfo serde = new SerDeInfo();
+ SerDeInfo oldSerde = serdes.put(serdeId, serde);
+ if (oldSerde != null) {
+ throw new MetaException("SDs reuse serdes; we don't expect that");
+ }
+ serde.setParameters(new HashMap<String, String>());
+ serde.setName((String)fields[12]);
+ serde.setSerializationLib((String)fields[13]);
+ serdeSb.append(serdeId).append(",");
+ sd.setSerdeInfo(serde);
+ }
+ query.closeAll();
+ if (doTrace) {
+ LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+ (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [ " + queryText + "]");
+ }
+
+ // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
+ String sdIds = trimCommaList(sdSb), serdeIds = trimCommaList(serdeSb),
+ colIds = trimCommaList(colsSb);
+
+ // Now get all the one-to-many things. Start with partitions.
+ queryText = "select PART_ID, PARAM_KEY, PARAM_VALUE from PARTITION_PARAMS where PART_ID in ("
+ + partIds + ") and PARAM_KEY is not null order by PART_ID asc";
+ loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
+ public void apply(Partition t, Object[] fields) {
+ t.putToParameters((String)fields[1], (String)fields[2]);
+ }});
+
+ queryText = "select PART_ID, PART_KEY_VAL from PARTITION_KEY_VALS where PART_ID in ("
+ + partIds + ") and INTEGER_IDX >= 0 order by PART_ID asc, INTEGER_IDX asc";
+ loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
+ public void apply(Partition t, Object[] fields) {
+ t.addToValues((String)fields[1]);
+ }});
+
+ // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
+ queryText = "select SD_ID, PARAM_KEY, PARAM_VALUE from SD_PARAMS where SD_ID in ("
+ + sdIds + ") and PARAM_KEY is not null order by SD_ID asc";
+ loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ public void apply(StorageDescriptor t, Object[] fields) {
+ t.putToParameters((String)fields[1], (String)fields[2]);
+ }});
+
+ // Note that SORT_COLS has "ORDER" column, which is not SQL92-legal. We have two choices
+ // here - drop SQL92, or get '*' and be broken on certain schema changes. We do the latter.
+ queryText = "select SD_ID, COLUMN_NAME, SORT_COLS.* from SORT_COLS where SD_ID in ("
+ + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+ loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ public void apply(StorageDescriptor t, Object[] fields) {
+ if (fields[4] == null) return;
+ t.addToSortCols(new Order((String)fields[1], (Integer)fields[4]));
+ }});
+
+ queryText = "select SD_ID, BUCKET_COL_NAME from BUCKETING_COLS where SD_ID in ("
+ + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+ loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ public void apply(StorageDescriptor t, Object[] fields) {
+ t.addToBucketCols((String)fields[1]);
+ }});
+
+ // Skewed columns stuff.
+ queryText = "select SD_ID, SKEWED_COL_NAME from SKEWED_COL_NAMES where SD_ID in ("
+ + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+ boolean hasSkewedColumns =
+ loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ public void apply(StorageDescriptor t, Object[] fields) {
+ if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+ t.getSkewedInfo().addToSkewedColNames((String)fields[1]);
+ }}) > 0;
+
+ // Assume we don't need to fetch the rest of the skewed column data if we have no columns.
+ if (hasSkewedColumns) {
+ // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
+ queryText =
+ "select SKEWED_VALUES.SD_ID_OID, SKEWED_STRING_LIST_VALUES.STRING_LIST_ID, "
+ + " SKEWED_STRING_LIST_VALUES.STRING_LIST_VALUE "
+ + "from SKEWED_VALUES "
+ + " left outer join SKEWED_STRING_LIST_VALUES on "
+ + " SKEWED_VALUES.STRING_LIST_ID_EID = SKEWED_STRING_LIST_VALUES.STRING_LIST_ID "
+ + "where SKEWED_VALUES.SD_ID_OID in (" + sdIds + ") "
+ + " and SKEWED_VALUES.STRING_LIST_ID_EID is not null "
+ + " and SKEWED_VALUES.INTEGER_IDX >= 0 "
+ + "order by SKEWED_VALUES.SD_ID_OID asc, SKEWED_VALUES.INTEGER_IDX asc, "
+ + " SKEWED_STRING_LIST_VALUES.INTEGER_IDX asc";
+ loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ private Long currentListId;
+ private List<String> currentList;
+ public void apply(StorageDescriptor t, Object[] fields) {
+ if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+ // Note that this is not a typical list accumulator - there's no call to finalize
+ // the last list. Instead we add list to SD first, as well as locally to add elements.
+ if (fields[1] == null) {
+ currentList = null; // left outer join produced a list with no values
+ currentListId = null;
+ t.getSkewedInfo().addToSkewedColValues(new ArrayList<String>());
+ } else {
+ long fieldsListId = (Long)fields[1];
+ if (currentListId == null || fieldsListId != currentListId) {
+ currentList = new ArrayList<String>();
+ currentListId = fieldsListId;
+ t.getSkewedInfo().addToSkewedColValues(currentList);
+ }
+ currentList.add((String)fields[2]);
+ }
+ }});
+
+ // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
+ queryText =
+ "select SKEWED_COL_VALUE_LOC_MAP.SD_ID, SKEWED_STRING_LIST_VALUES.STRING_LIST_ID,"
+ + " SKEWED_COL_VALUE_LOC_MAP.LOCATION, SKEWED_STRING_LIST_VALUES.STRING_LIST_VALUE "
+ + "from SKEWED_COL_VALUE_LOC_MAP"
+ + " left outer join SKEWED_STRING_LIST_VALUES on SKEWED_COL_VALUE_LOC_MAP."
+ + "STRING_LIST_ID_KID = SKEWED_STRING_LIST_VALUES.STRING_LIST_ID "
+ + "where SKEWED_COL_VALUE_LOC_MAP.SD_ID in (" + sdIds + ")"
+ + " and SKEWED_COL_VALUE_LOC_MAP.STRING_LIST_ID_KID is not null "
+ + "order by SKEWED_COL_VALUE_LOC_MAP.SD_ID asc,"
+ + " SKEWED_STRING_LIST_VALUES.INTEGER_IDX asc";
+ loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+ private Long currentListId;
+ private SkewedValueList currentList;
+ public void apply(StorageDescriptor t, Object[] fields) {
+ if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+ // Note that this is not a typical list accumulator - there's no call to finalize
+ // the last list. Instead we add list to SD first, as well as locally to add elements.
+ if (fields[1] == null) {
+ currentList = null; // left outer join produced a list with no values
+ currentListId = null;
+ t.getSkewedInfo().putToSkewedColValueLocationMaps(
+ new SkewedValueList(), (String)fields[2]);
+ } else {
+ long fieldsListId = (Long)fields[1];
+ if (currentListId == null || fieldsListId != currentListId) {
+ currentList = new SkewedValueList();
+ currentListId = fieldsListId;
+ t.getSkewedInfo().putToSkewedColValueLocationMaps(currentList, (String)fields[2]);
+ }
+ currentList.addToSkewedValueList((String)fields[3]);
+ }
+ }});
+ } // if (hasSkewedColumns)
+
+ // Get FieldSchema stuff if any.
+ if (!colss.isEmpty()) {
+ // We are skipping the CDS table here, as it seems to be totally useless.
+ queryText = "select CD_ID, COMMENT, COLUMN_NAME, TYPE_NAME from COLUMNS_V2 where CD_ID in ("
+ + colIds + ") and INTEGER_IDX >= 0 order by CD_ID asc, INTEGER_IDX asc";
+ loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
+ public void apply(List<FieldSchema> t, Object[] fields) {
+ t.add(new FieldSchema((String)fields[2], (String)fields[3], (String)fields[1]));
+ }});
+ }
+
+ // Finally, get all the stuff for serdes - just the params.
+ queryText = "select SERDE_ID, PARAM_KEY, PARAM_VALUE from SERDE_PARAMS where SERDE_ID in ("
+ + serdeIds + ") and PARAM_KEY is not null order by SERDE_ID asc";
+ loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
+ public void apply(SerDeInfo t, Object[] fields) {
+ t.putToParameters((String)fields[1], (String)fields[2]);
+ }});
+
+ return orderedResult;
+ }
+
+ private static Boolean extractSqlBoolean(Object value) throws MetaException {
+ // MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping. People using derby probably
+ // don't care about performance anyway, but let's cover the common case.
+ if (value == null) return null;
+ if (value instanceof Boolean) return (Boolean)value;
+ Character c = null;
+ if (value instanceof String && ((String)value).length() == 1) {
+ c = ((String)value).charAt(0);
+ }
+ if (c == 'Y') return true;
+ if (c == 'N') return false;
+ throw new MetaException("Cannot extrace boolean from column value " + value);
+ }
+
+ private static String trimCommaList(StringBuilder sb) {
+ if (sb.length() > 0) {
+ sb.setLength(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private abstract class ApplyFunc<Target> {
+ public abstract void apply(Target t, Object[] fields);
+ }
+
+ /**
+ * Merges applies the result of a PM SQL query into a tree of object.
+ * Essentially it's an object join. DN could do this for us, but it issues queries
+ * separately for every object, which is suboptimal.
+ * @param tree The object tree, by ID.
+ * @param queryText The query text.
+ * @param keyIndex Index of the Long column corresponding to the map ID in query result rows.
+ * @param func The function that is called on each (object,row) pair with the same id.
+ * @return the count of results returned from the query.
+ */
+ private <T> int loopJoinOrderedResult(TreeMap<Long, T> tree,
+ String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException {
+ boolean doTrace = LOG.isDebugEnabled();
+ long start = doTrace ? System.nanoTime() : 0;
+ Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ Object result = query.execute();
+ long queryTime = doTrace ? System.nanoTime() : 0;
+ if (result == null) {
+ query.closeAll();
+ return 0;
+ }
+ if (!(result instanceof List<?>)) {
+ throw new MetaException("Wrong result type " + result.getClass());
+ }
+ @SuppressWarnings("unchecked")
+ List<Object[]> list = (List<Object[]>)result;
+ Iterator<Object[]> iter = list.iterator();
+ Object[] fields = null;
+ for (Map.Entry<Long, T> entry : tree.entrySet()) {
+ if (fields == null && !iter.hasNext()) break;
+ long id = entry.getKey();
+ while (fields != null || iter.hasNext()) {
+ if (fields == null) {
+ fields = iter.next();
+ }
+ long nestedId = (Long)fields[keyIndex];
+ if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId);
+ if (nestedId > id) break; // fields belong to one of the next entries
+ func.apply(entry.getValue(), fields);
+ fields = null;
+ }
+ }
+ int rv = list.size();
+ query.closeAll();
+ if (doTrace) {
+ LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+ (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
+ }
+ return rv;
+ }
+
+ private static class PartitionFilterGenerator implements TreeVisitor {
+ private final Table table;
+ private final StringBuilder filterBuffer;
+ private final List<String> params;
+ private final List<String> joins;
+
+ private PartitionFilterGenerator(Table table, List<String> params, List<String> joins) {
+ this.table = table;
+ this.params = params;
+ this.joins = joins;
+ this.filterBuffer = new StringBuilder();
+ }
+
+ /**
+ * Generate the ANSI SQL92 filter for the given expression tree
+ * @param table the table being queried
+ * @param params the ordered parameters for the resulting expression
+ * @param joins the joins necessary for the resulting expression
+ * @return the string representation of the expression tree
+ */
+ public static String generateSqlFilter(Table table,
+ ExpressionTree tree, List<String> params, List<String> joins) throws MetaException {
+ assert table != null;
+ if (tree.getRoot() == null) {
+ return "";
+ }
+ PartitionFilterGenerator visitor = new PartitionFilterGenerator(table, params, joins);
+ tree.getRoot().accept(visitor);
+ // Some joins might be null (see processNode for LeafNode), clean them up.
+ for (int i = 0; i < joins.size(); ++i) {
+ if (joins.get(i) != null) continue;
+ joins.remove(i--);
+ }
+ return "and (" + visitor.filterBuffer.toString() + ")";
+ }
+
+ @Override
+ public void visit(TreeNode node) throws MetaException {
+ assert node != null && node.getLhs() != null && node.getRhs() != null;
+ filterBuffer.append (" (");
+ node.getLhs().accept(this);
+ filterBuffer.append((node.getAndOr() == LogicalOperator.AND) ? " and " : " or ");
+ node.getRhs().accept(this);
+ filterBuffer.append (") ");
+ }
+
+ @Override
+ public void visit(LeafNode node) throws MetaException {
+ if (node.operator == Operator.LIKE) {
+ // ANSI92 supports || for concatenation (we need to concat '%'-s to the parameter),
+ // but it doesn't work on all RDBMSes, e.g. on MySQL by default. So don't use it for now.
+ throw new MetaException("LIKE is not supported for SQL filter pushdown");
+ }
+ int partColCount = table.getPartitionKeys().size();
+ int partColIndex = node.getPartColIndexForFilter(table);
+
+ String valueAsString = node.getFilterPushdownParam(table, partColIndex);
+ // Add parameters linearly; we are traversing leaf nodes LTR, so they would match correctly.
+ params.add(valueAsString);
+
+ if (joins.isEmpty()) {
+ // There's a fixed number of partition cols that we might have filters on. To avoid
+ // joining multiple times for one column (if there are several filters on it), we will
+ // keep numCols elements in the list, one for each column; we will fill it with nulls,
+ // put each join at a corresponding index when necessary, and remove nulls in the end.
+ for (int i = 0; i < partColCount; ++i) {
+ joins.add(null);
+ }
+ }
+ if (joins.get(partColIndex) == null) {
+ joins.set(partColIndex, "inner join PARTITION_KEY_VALS as FILTER" + partColIndex
+ + " on FILTER" + partColIndex + ".PART_ID = PARTITIONS.PART_ID and FILTER"
+ + partColIndex + ".INTEGER_IDX = " + partColIndex);
+ }
+
+ String tableValue = "FILTER" + partColIndex + ".PART_KEY_VAL";
+ // TODO: need casts here if #doesOperatorSupportIntegral is amended to include lt/gt/etc.
+ filterBuffer.append(node.isReverseOrder
+ ? "(? " + node.operator.getSqlOp() + " " + tableValue + ")"
+ : "(" + tableValue + " " + node.operator.getSqlOp() + " ?)");
+ }
+ }
+}
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1511177&r1=1511176&r2=1511177&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Wed Aug 7 05:30:01 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.metastore;
import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.commons.lang.StringUtils.repeat;
import java.net.URI;
import java.net.URISyntaxException;
@@ -33,6 +34,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -157,6 +159,7 @@ public class ObjectStore implements RawS
private boolean isInitialized = false;
private PersistenceManager pm = null;
+ private MetaStoreDirectSql directSql = null;
private Configuration hiveConf;
int openTrasactionCalls = 0;
private Transaction currentTransaction = null;
@@ -196,6 +199,7 @@ public class ObjectStore implements RawS
// Always want to re-create pm as we don't know if it were created by the
// most recent instance of the pmf
pm = null;
+ directSql = null;
openTrasactionCalls = 0;
currentTransaction = null;
transactionStatus = TXN_STATUS.NO_STATE;
@@ -227,6 +231,9 @@ public class ObjectStore implements RawS
prop = dsProps;
pm = getPersistenceManager();
isInitialized = pm != null;
+ if (isInitialized) {
+ directSql = new MetaStoreDirectSql(pm);
+ }
return;
}
@@ -1652,43 +1659,34 @@ public class ObjectStore implements RawS
@Override
public List<Partition> getPartitionsByNames(String dbName, String tblName,
List<String> partNames) throws MetaException, NoSuchObjectException {
+ boolean doTrace = LOG.isDebugEnabled();
+ List<Partition> results = null;
+ boolean doUseDirectSql = HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
boolean success = false;
try {
+ long start = doTrace ? System.nanoTime() : 0;
openTransaction();
-
- StringBuilder sb = new StringBuilder(
- "table.tableName == t1 && table.database.name == t2 && (");
- int n = 0;
- Map<String, String> params = new HashMap<String, String>();
- for (Iterator<String> itr = partNames.iterator(); itr.hasNext();) {
- String pn = "p" + n;
- n++;
- String part = itr.next();
- params.put(pn, part);
- sb.append("partitionName == ").append(pn);
- sb.append(" || ");
+ if (doUseDirectSql) {
+ try {
+ results = directSql.getPartitionsViaSqlFilter(dbName, tblName, partNames);
+ } catch (Exception ex) {
+ LOG.error("Direct SQL failed, falling back to ORM", ex);
+ doUseDirectSql = false;
+ rollbackTransaction();
+ start = doTrace ? System.nanoTime() : 0;
+ openTransaction();
+ }
}
- sb.setLength(sb.length() - 4); // remove the last " || "
- sb.append(')');
-
- Query query = pm.newQuery(MPartition.class, sb.toString());
-
- LOG.debug(" JDOQL filter is " + sb.toString());
-
- params.put("t1", tblName.trim());
- params.put("t2", dbName.trim());
-
- String parameterDeclaration = makeParameterDeclarationString(params);
- query.declareParameters(parameterDeclaration);
- query.setOrdering("partitionName ascending");
- List<MPartition> mparts = (List<MPartition>) query.executeWithMap(params);
- // pm.retrieveAll(mparts); // retrieveAll is pessimistic. some fields may not be needed
- List<Partition> results = convertToParts(dbName, tblName, mparts);
- // pm.makeTransientAll(mparts); // makeTransient will prohibit future access of unfetched fields
- query.closeAll();
+ if (!doUseDirectSql) {
+ results = getPartitionsViaOrm(dbName, tblName, partNames);
+ }
success = commitTransaction();
+ if (doTrace) {
+ LOG.debug(results.size() + " partition retrieved using " + (doUseDirectSql ? "SQL" : "ORM")
+ + " in " + ((System.nanoTime() - start) / 1000000.0) + "ms");
+ }
return results;
} finally {
if (!success) {
@@ -1697,15 +1695,98 @@ public class ObjectStore implements RawS
}
}
+ private List<Partition> getPartitionsViaOrm(
+ String dbName, String tblName, List<String> partNames) throws MetaException {
+ StringBuilder sb = new StringBuilder(
+ "table.tableName == t1 && table.database.name == t2 && (");
+ int n = 0;
+ Map<String, String> params = new HashMap<String, String>();
+ for (Iterator<String> itr = partNames.iterator(); itr.hasNext();) {
+ String pn = "p" + n;
+ n++;
+ String part = itr.next();
+ params.put(pn, part);
+ sb.append("partitionName == ").append(pn);
+ sb.append(" || ");
+ }
+ sb.setLength(sb.length() - 4); // remove the last " || "
+ sb.append(')');
+
+ Query query = pm.newQuery(MPartition.class, sb.toString());
+
+ LOG.debug(" JDOQL filter is " + sb.toString());
+ params.put("t1", tblName.trim());
+ params.put("t2", dbName.trim());
+
+ String parameterDeclaration = makeParameterDeclarationString(params);
+
+ query.declareParameters(parameterDeclaration);
+ query.setOrdering("partitionName ascending");
+
+ List<MPartition> mparts = (List<MPartition>) query.executeWithMap(params);
+ // pm.retrieveAll(mparts); // retrieveAll is pessimistic. some fields may not be needed
+ List<Partition> results = convertToParts(dbName, tblName, mparts);
+ // pm.makeTransientAll(mparts); // makeTransient will prohibit future access of unfetched fields
+ query.closeAll();
+ return results;
+ }
+
@Override
public List<Partition> getPartitionsByFilter(String dbName, String tblName,
String filter, short maxParts) throws MetaException, NoSuchObjectException {
- openTransaction();
- List<Partition> parts = convertToParts(listMPartitionsByFilter(dbName,
- tblName, filter, maxParts));
- LOG.info("# parts after pruning = " + parts.size());
- commitTransaction();
- return parts;
+ boolean doTrace = LOG.isDebugEnabled();
+ // There's no portable SQL limit. It doesn't make a lot of sense w/o offset anyway.
+ boolean doUseDirectSql = (maxParts < 0)
+ && HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
+ dbName = dbName.toLowerCase();
+ tblName = tblName.toLowerCase();
+ List<Partition> results = null;
+ FilterParser parser = null;
+ if (filter != null && filter.length() != 0) {
+ LOG.debug("Filter specified is " + filter);
+ parser = getFilterParser(filter);
+ }
+
+ boolean success = false;
+ try {
+ long start = doTrace ? System.nanoTime() : 0;
+ openTransaction();
+ MTable mtable = ensureGetMTable(dbName, tblName);
+ if (doUseDirectSql) {
+ try {
+ Table table = convertToTable(mtable);
+ results = directSql.getPartitionsViaSqlFilter(table, dbName, tblName, parser);
+ } catch (Exception ex) {
+ LOG.error("Direct SQL failed, falling back to ORM", ex);
+ doUseDirectSql = false;
+ rollbackTransaction();
+ start = doTrace ? System.nanoTime() : 0;
+ openTransaction();
+ mtable = ensureGetMTable(dbName, tblName); // Detached on rollback, get again.
+ }
+ }
+ if (!doUseDirectSql) {
+ results = convertToParts(listMPartitionsByFilterNoTxn(
+ mtable, dbName, tblName, parser, maxParts));
+ }
+ success = commitTransaction();
+ LOG.info(results.size() + " partitions retrieved using " + (doUseDirectSql ? "SQL" : "ORM")
+ + (doTrace ? (" in " + ((System.nanoTime() - start) / 1000000.0) + "ms") : ""));
+ return results;
+ } finally {
+ if (!success) {
+ rollbackTransaction();
+ }
+ }
+ }
+
+ private MTable ensureGetMTable(String dbName, String tblName) throws NoSuchObjectException {
+ MTable mtable = getMTable(dbName, tblName);
+ if (mtable == null) {
+ throw new NoSuchObjectException("Specified database/table does not exist : "
+ + dbName + "." + tblName);
+ }
+ return mtable;
}
private FilterParser getFilterParser(String filter) throws MetaException {
@@ -1736,6 +1817,18 @@ public class ObjectStore implements RawS
* if mtable is null, generates the query to filter over tables in a database
*/
private String makeQueryFilterString(MTable mtable, String filter,
+ Map<String, Object> params) throws MetaException {
+ FilterParser parser =
+ (filter != null && filter.length() != 0) ? getFilterParser(filter) : null;
+ return makeQueryFilterString(mtable, parser, params);
+ }
+
+ /**
+ * Makes a JDO query filter string
+ * if mtable is not null, generates the query to filter over partitions in a table.
+ * if mtable is null, generates the query to filter over tables in a database
+ */
+ private String makeQueryFilterString(MTable mtable, FilterParser parser,
Map<String, Object> params)
throws MetaException {
@@ -1746,10 +1839,8 @@ public class ObjectStore implements RawS
queryBuilder.append("database.name == dbName");
}
- if (filter != null && filter.length() > 0) {
- FilterParser parser = getFilterParser(filter);
+ if (parser != null) {
String jdoFilter;
-
if (mtable != null) {
Table table = convertToTable(mtable);
jdoFilter = parser.tree.generateJDOFilter(table, params);
@@ -1794,54 +1885,35 @@ public class ObjectStore implements RawS
return paramDecl.toString();
}
- private List<MPartition> listMPartitionsByFilter(String dbName, String tableName,
- String filter, short maxParts) throws MetaException, NoSuchObjectException{
- boolean success = false;
+ private List<MPartition> listMPartitionsByFilterNoTxn(MTable mtable, String dbName,
+ String tableName, FilterParser parser, short maxParts)
+ throws MetaException, NoSuchObjectException {
List<MPartition> mparts = null;
- try {
- openTransaction();
- LOG.debug("Executing listMPartitionsByFilter");
- dbName = dbName.toLowerCase();
- tableName = tableName.toLowerCase();
+ LOG.debug("Executing listMPartitionsByFilterNoTxn");
+ Map<String, Object> params = new HashMap<String, Object>();
+ String queryFilterString = makeQueryFilterString(mtable, parser, params);
- MTable mtable = getMTable(dbName, tableName);
- if( mtable == null ) {
- throw new NoSuchObjectException("Specified database/table does not exist : "
- + dbName + "." + tableName);
- }
- Map<String, Object> params = new HashMap<String, Object>();
- String queryFilterString =
- makeQueryFilterString(mtable, filter, params);
-
- Query query = pm.newQuery(MPartition.class,
- queryFilterString);
+ Query query = pm.newQuery(MPartition.class,
+ queryFilterString);
- if( maxParts >= 0 ) {
- //User specified a row limit, set it on the Query
- query.setRange(0, maxParts);
- }
+ if( maxParts >= 0 ) {
+ //User specified a row limit, set it on the Query
+ query.setRange(0, maxParts);
+ }
- LOG.debug("Filter specified is " + filter + "," +
- " JDOQL filter is " + queryFilterString);
+ LOG.debug("JDOQL filter is " + queryFilterString);
- params.put("t1", tableName.trim());
- params.put("t2", dbName.trim());
+ params.put("t1", tableName.trim());
+ params.put("t2", dbName.trim());
- String parameterDeclaration = makeParameterDeclarationStringObj(params);
- query.declareParameters(parameterDeclaration);
- query.setOrdering("partitionName ascending");
+ String parameterDeclaration = makeParameterDeclarationStringObj(params);
+ query.declareParameters(parameterDeclaration);
+ query.setOrdering("partitionName ascending");
- mparts = (List<MPartition>) query.executeWithMap(params);
+ mparts = (List<MPartition>) query.executeWithMap(params);
- LOG.debug("Done executing query for listMPartitionsByFilter");
- pm.retrieveAll(mparts);
- success = commitTransaction();
- LOG.debug("Done retrieving all objects for listMPartitionsByFilter");
- } finally {
- if (!success) {
- rollbackTransaction();
- }
- }
+ LOG.debug("Done executing query for listMPartitionsByFilterNoTxn");
+ pm.retrieveAll(mparts);
return mparts;
}
@@ -1909,8 +1981,7 @@ public class ObjectStore implements RawS
return partNames;
}
Map<String, Object> params = new HashMap<String, Object>();
- String queryFilterString =
- makeQueryFilterString(mtable, filter, params);
+ String queryFilterString = makeQueryFilterString(mtable, filter, params);
Query query = pm.newQuery(
"select partitionName from org.apache.hadoop.hive.metastore.model.MPartition "
+ "where " + queryFilterString);
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java?rev=1511177&r1=1511176&r2=1511177&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java Wed Aug 7 05:30:01 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.metastore.parser;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.common.Fil
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import com.google.common.collect.Sets;
@@ -46,27 +48,30 @@ public class ExpressionTree {
/** The operators supported. */
public enum Operator {
- EQUALS ("=", "=="),
+ EQUALS ("=", "==", "="),
GREATERTHAN (">"),
LESSTHAN ("<"),
LESSTHANOREQUALTO ("<="),
GREATERTHANOREQUALTO (">="),
- LIKE ("LIKE", "matches"),
- NOTEQUALS2 ("!=", "!="),
- NOTEQUALS ("<>", "!=");
+ LIKE ("LIKE", "matches", "like"),
+ NOTEQUALS2 ("!=", "!=", "<>"),
+ NOTEQUALS ("<>", "!=", "<>");
private final String op;
private final String jdoOp;
+ private final String sqlOp;
// private constructor
private Operator(String op){
this.op = op;
this.jdoOp = op;
+ this.sqlOp = op;
}
- private Operator(String op, String jdoOp){
+ private Operator(String op, String jdoOp, String sqlOp){
this.op = op;
this.jdoOp = jdoOp;
+ this.sqlOp = sqlOp;
}
public String getOp() {
@@ -77,6 +82,10 @@ public class ExpressionTree {
return jdoOp;
}
+ public String getSqlOp() {
+ return sqlOp;
+ }
+
public static Operator fromString(String inputOperator) {
for(Operator op : Operator.values()) {
if(op.getOp().equals(inputOperator)){
@@ -95,6 +104,10 @@ public class ExpressionTree {
}
+ public static interface TreeVisitor {
+ void visit(TreeNode node) throws MetaException;
+ void visit(LeafNode node) throws MetaException;
+ }
/**
* The Class representing a Node in the ExpressionTree.
@@ -113,6 +126,23 @@ public class ExpressionTree {
this.rhs = rhs;
}
+ public TreeNode getLhs() {
+ return lhs;
+ }
+
+ public LogicalOperator getAndOr() {
+ return andOr;
+ }
+
+ public TreeNode getRhs() {
+ return rhs;
+ }
+
+ /** Double dispatch for TreeVisitor. */
+ public void accept(TreeVisitor visitor) throws MetaException {
+ visitor.visit(this);
+ }
+
/**
* Generates a JDO filter statement
* @param table
@@ -162,6 +192,11 @@ public class ExpressionTree {
private static final String PARAM_PREFIX = "hive_filter_param_";
@Override
+ public void accept(TreeVisitor visitor) throws MetaException {
+ visitor.visit(this);
+ }
+
+ @Override
public String generateJDOFilter(Table table,
Map<String, Object> params)
throws MetaException {
@@ -238,50 +273,13 @@ public class ExpressionTree {
private String generateJDOFilterOverPartitions(Table table, Map<String, Object> params)
throws MetaException {
-
int partitionColumnCount = table.getPartitionKeys().size();
- int partitionColumnIndex;
- for(partitionColumnIndex = 0;
- partitionColumnIndex < partitionColumnCount;
- partitionColumnIndex++ ) {
- if( table.getPartitionKeys().get(partitionColumnIndex).getName().
- equalsIgnoreCase(keyName)) {
- break;
- }
- }
- assert (table.getPartitionKeys().size() > 0);
-
- if( partitionColumnIndex == table.getPartitionKeys().size() ) {
- throw new MetaException("Specified key <" + keyName +
- "> is not a partitioning key for the table");
- }
-
- String keyType = table.getPartitionKeys().get(partitionColumnIndex).getType();
- boolean isIntegralSupported = doesOperatorSupportIntegral(operator);
-
- // Can only support partitions whose types are string, or maybe integers
- if (!keyType.equals(org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME)
- && (!isIntegralSupported || !isIntegralType(keyType))) {
- throw new MetaException("Filtering is supported only on partition keys of type " +
- "string" + (isIntegralSupported ? ", or integral types" : ""));
- }
-
- boolean isStringValue = value instanceof String;
- if (!isStringValue && (!isIntegralSupported || !(value instanceof Long))) {
- throw new MetaException("Filtering is supported only on partition keys of type " +
- "string" + (isIntegralSupported ? ", or integral types" : ""));
- }
-
- String valueAsString = null;
- try {
- valueAsString = isStringValue ? (String) value : Long.toString((Long) value);
- } catch (ClassCastException e) {
- throw new MetaException("Unable to cast the constexpr to "
- + (isStringValue ? "string" : "long"));
- }
+ int partitionColumnIndex = getPartColIndexForFilter(table);
+ String valueAsString = getFilterPushdownParam(table, partitionColumnIndex);
String paramName = PARAM_PREFIX + params.size();
params.put(paramName, valueAsString);
+
boolean isOpEquals = operator == Operator.EQUALS;
if (isOpEquals || operator == Operator.NOTEQUALS || operator == Operator.NOTEQUALS2) {
return makeFilterForEquals(keyName, valueAsString, paramName, params,
@@ -320,6 +318,7 @@ public class ExpressionTree {
* @return true iff filter pushdown for this operator can be done for integral types.
*/
private static boolean doesOperatorSupportIntegral(Operator operator) {
+ // TODO: for SQL-based filtering, this could be amended if we added casts.
return (operator == Operator.EQUALS)
|| (operator == Operator.NOTEQUALS)
|| (operator == Operator.NOTEQUALS2);
@@ -330,10 +329,61 @@ public class ExpressionTree {
* @return true iff type is an integral type.
*/
private static boolean isIntegralType(String type) {
- return type.equals(org.apache.hadoop.hive.serde.serdeConstants.TINYINT_TYPE_NAME)
- || type.equals(org.apache.hadoop.hive.serde.serdeConstants.SMALLINT_TYPE_NAME)
- || type.equals(org.apache.hadoop.hive.serde.serdeConstants.INT_TYPE_NAME)
- || type.equals(org.apache.hadoop.hive.serde.serdeConstants.BIGINT_TYPE_NAME);
+ return type.equals(serdeConstants.TINYINT_TYPE_NAME)
+ || type.equals(serdeConstants.SMALLINT_TYPE_NAME)
+ || type.equals(serdeConstants.INT_TYPE_NAME)
+ || type.equals(serdeConstants.BIGINT_TYPE_NAME);
+ }
+
+ /**
+ * Get partition column index in the table partition column list that
+ * corresponds to the key that is being filtered on by this tree node.
+ * @param table The table.
+ * @return The index.
+ */
+ public int getPartColIndexForFilter(Table table) throws MetaException {
+ int partitionColumnIndex;
+ assert (table.getPartitionKeys().size() > 0);
+ for (partitionColumnIndex = 0; partitionColumnIndex < table.getPartitionKeys().size();
+ ++partitionColumnIndex) {
+ if (table.getPartitionKeys().get(partitionColumnIndex).getName().
+ equalsIgnoreCase(keyName)) {
+ break;
+ }
+ }
+ if( partitionColumnIndex == table.getPartitionKeys().size() ) {
+ throw new MetaException("Specified key <" + keyName +
+ "> is not a partitioning key for the table");
+ }
+
+ return partitionColumnIndex;
+ }
+
+ /**
+ * Validates and gets the query parameter for filter pushdown based on the column
+ * and the constant stored in this node.
+ * In future this may become different for SQL and JDOQL filter pushdown.
+ * @param table The table.
+ * @param partColIndex The index of the column to check.
+ * @return The parameter string.
+ */
+ public String getFilterPushdownParam(Table table, int partColIndex) throws MetaException {
+ boolean isIntegralSupported = doesOperatorSupportIntegral(operator);
+ String colType = table.getPartitionKeys().get(partColIndex).getType();
+ // Can only support partitions whose types are string, or maybe integers
+ if (!colType.equals(serdeConstants.STRING_TYPE_NAME)
+ && (!isIntegralSupported || !isIntegralType(colType))) {
+ throw new MetaException("Filtering is supported only on partition keys of type " +
+ "string" + (isIntegralSupported ? ", or integral types" : ""));
+ }
+
+ boolean isStringValue = value instanceof String;
+ if (!isStringValue && (!isIntegralSupported || !(value instanceof Long))) {
+ throw new MetaException("Filtering is supported only on partition keys of type " +
+ "string" + (isIntegralSupported ? ", or integral types" : ""));
+ }
+
+ return isStringValue ? (String) value : Long.toString((Long) value);
}
}
@@ -404,6 +454,10 @@ public class ExpressionTree {
*/
private final Stack<TreeNode> nodeStack = new Stack<TreeNode>();
+ public TreeNode getRoot() {
+ return this.root;
+ }
+
/**
* Adds a intermediate node of either type(AND/OR). Pops last two nodes from
* the stack and sets them as children of the new node and pushes itself
@@ -447,6 +501,7 @@ public class ExpressionTree {
return root.generateJDOFilter(table, params);
}
+
/** Case insensitive ANTLR string stream */
public static class ANTLRNoCaseStringStream extends ANTLRStringStream {
public ANTLRNoCaseStringStream (String input) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1511177&r1=1511176&r2=1511177&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Aug 7 05:30:01 2013
@@ -1814,6 +1814,7 @@ private void constructOneLBLocationMap(F
List<Partition> partitions = new ArrayList<Partition>(partNames.size());
int batchSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX);
+ // TODO: might want to increase the default batch size. 1024 is viable; MS gets OOM if too high.
int nParts = partNames.size();
int nBatches = nParts / batchSize;