You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:32 UTC

[23/50] [abbrv] hive git commit: HIVE-19416 : merge master into branch (Sergey Shelukhin) 0719

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 0000000,f45b71f..07be1ba
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@@ -1,0 -1,2817 +1,2837 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.metastore;
+ 
+ import static org.apache.commons.lang.StringUtils.join;
+ import static org.apache.commons.lang.StringUtils.normalizeSpace;
+ import static org.apache.commons.lang.StringUtils.repeat;
+ import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+ 
+ import java.sql.Blob;
+ import java.sql.Clob;
+ import java.sql.Connection;
+ import java.sql.SQLException;
+ import java.sql.Statement;
+ import java.text.ParseException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.TreeMap;
+ import java.util.stream.Collectors;
+ 
+ import javax.jdo.PersistenceManager;
+ import javax.jdo.Query;
+ import javax.jdo.Transaction;
+ import javax.jdo.datastore.JDOConnection;
+ 
+ import org.apache.commons.lang.BooleanUtils;
+ import org.apache.commons.lang.StringUtils;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats;
+ import org.apache.hadoop.hive.metastore.api.AggrStats;
+ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+ import org.apache.hadoop.hive.metastore.api.Database;
+ import org.apache.hadoop.hive.metastore.api.FieldSchema;
+ import org.apache.hadoop.hive.metastore.api.MetaException;
+ import org.apache.hadoop.hive.metastore.api.Order;
+ import org.apache.hadoop.hive.metastore.api.Partition;
+ import org.apache.hadoop.hive.metastore.api.PrincipalType;
+ import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
+ import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
+ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+ import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+ import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+ import org.apache.hadoop.hive.metastore.api.Table;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.model.MConstraint;
+ import org.apache.hadoop.hive.metastore.model.MCreationMetadata;
+ import org.apache.hadoop.hive.metastore.model.MDatabase;
+ import org.apache.hadoop.hive.metastore.model.MNotificationLog;
+ import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
+ import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege;
+ import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
+ import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege;
+ import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
+ import org.apache.hadoop.hive.metastore.model.MWMResourcePlan;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
+ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+ import org.apache.hive.common.util.BloomFilter;
+ import org.datanucleus.store.rdbms.query.ForwardQueryResult;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.collect.Lists;
+ 
+ /**
+  * This class contains the optimizations for MetaStore that rely on direct SQL access to
+  * the underlying database. It should use ANSI SQL and be compatible with common databases
+  * such as MySQL (note that MySQL doesn't use full ANSI mode by default), Postgres, etc.
+  *
+  * As of now, only the partition retrieval is done this way to improve job startup time;
+  * JDOQL partition retrieval is still present so as not to limit the ORM solution we have
+  * to SQL stores only. There's always a way to do without direct SQL.
+  */
+ class MetaStoreDirectSql {
+   private static final int NO_BATCHING = -1, DETECT_BATCHING = 0;
+ 
+   private static final Logger LOG = LoggerFactory.getLogger(MetaStoreDirectSql.class);
+   private final PersistenceManager pm;
++  private final Configuration conf;
+   private final String schema;
+ 
+   /**
+    * We want to avoid db-specific code in this class and stick with ANSI SQL. However:
+    * 1) mysql and postgres are differently ansi-incompatible (mysql by default doesn't support
+    * quoted identifiers, and postgres contravenes ANSI by coercing unquoted ones to lower case).
+    * MySQL's way of working around this is simpler (just set ansi quotes mode on), so we will
+    * use that. MySQL detection is done by actually issuing the set-ansi-quotes command;
+    *
+    * Use sparingly, we don't want to devolve into another DataNucleus...
+    */
+   private final DatabaseProduct dbType;
+   private final int batchSize;
+   private final boolean convertMapNullsToEmptyStrings;
+   private final String defaultPartName;
+ 
+   /**
+    * Whether direct SQL can be used with the current datastore backing {@link #pm}.
+    */
+   private final boolean isCompatibleDatastore;
+   private final boolean isAggregateStatsCacheEnabled;
+   private AggregateStatsCache aggrStatsCache;
+ 
+   @java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD)
+   @java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
+   private @interface TableName {}
+ 
+   // Table names with schema name, if necessary
+   @TableName
+   private String DBS, TBLS, PARTITIONS, DATABASE_PARAMS, PARTITION_PARAMS, SORT_COLS, SD_PARAMS,
+       SDS, SERDES, SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, BUCKETING_COLS, SKEWED_COL_NAMES,
+       SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, PARTITION_KEYS, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS,
+       TAB_COL_STATS, PARTITION_KEY_VALS, PART_PRIVS, PART_COL_PRIVS, SKEWED_STRING_LIST, CDS;
+ 
++
+   public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) {
+     this.pm = pm;
++    this.conf = conf;
+     this.schema = schema;
+     DatabaseProduct dbType = null;
+     try {
+       dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm));
+     } catch (SQLException e) {
+       LOG.warn("Cannot determine database product; assuming OTHER", e);
+       dbType = DatabaseProduct.OTHER;
+     }
+     this.dbType = dbType;
+     int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE);
+     if (batchSize == DETECT_BATCHING) {
+       batchSize = DatabaseProduct.needsInBatching(dbType) ? 1000 : NO_BATCHING;
+     }
+     this.batchSize = batchSize;
+ 
+     for (java.lang.reflect.Field f : this.getClass().getDeclaredFields()) {
+       if (f.getAnnotation(TableName.class) == null) continue;
+       try {
+         f.set(this, getFullyQualifiedName(schema, f.getName()));
+       } catch (IllegalArgumentException | IllegalAccessException e) {
+         throw new RuntimeException("Internal error, cannot set " + f.getName());
+       }
+     }
+ 
+     convertMapNullsToEmptyStrings =
+         MetastoreConf.getBoolVar(conf, ConfVars.ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS);
+     defaultPartName = MetastoreConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
+ 
+     String jdoIdFactory = MetastoreConf.getVar(conf, ConfVars.IDENTIFIER_FACTORY);
+     if (! ("datanucleus1".equalsIgnoreCase(jdoIdFactory))){
+       LOG.warn("Underlying metastore does not use 'datanucleus1' for its ORM naming scheme."
+           + " Disabling directSQL as it uses hand-hardcoded SQL with that assumption.");
+       isCompatibleDatastore = false;
+     } else {
+       boolean isInTest = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
+       isCompatibleDatastore = (!isInTest || ensureDbInit()) && runTestQuery();
+       if (isCompatibleDatastore) {
+         LOG.debug("Using direct SQL, underlying DB is " + dbType);
+       }
+     }
+ 
+     isAggregateStatsCacheEnabled = MetastoreConf.getBoolVar(
+         conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED);
+     if (isAggregateStatsCacheEnabled) {
+       aggrStatsCache = AggregateStatsCache.getInstance(conf);
+     }
+   }
+ 
+   private static String getFullyQualifiedName(String schema, String tblName) {
+     return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema + "\".\"")
+         + "\"" + tblName + "\"";
+   }
+ 
+ 
+   public MetaStoreDirectSql(PersistenceManager pm, Configuration conf) {
+     this(pm, conf, "");
+   }
+ 
+   static String getProductName(PersistenceManager pm) {
+     JDOConnection jdoConn = pm.getDataStoreConnection();
+     try {
+       return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
+     } catch (Throwable t) {
+       LOG.warn("Error retrieving product name", t);
+       return null;
+     } finally {
+       jdoConn.close(); // We must release the connection before we call other pm methods.
+     }
+   }
+ 
+   private boolean ensureDbInit() {
+     Transaction tx = pm.currentTransaction();
+     boolean doCommit = false;
+     if (!tx.isActive()) {
+       tx.begin();
+       doCommit = true;
+     }
+     LinkedList<Query> initQueries = new LinkedList<>();
+ 
+     try {
+       // Force the underlying db to initialize.
+       initQueries.add(pm.newQuery(MDatabase.class, "name == ''"));
+       initQueries.add(pm.newQuery(MTableColumnStatistics.class, "dbName == ''"));
+       initQueries.add(pm.newQuery(MPartitionColumnStatistics.class, "dbName == ''"));
+       initQueries.add(pm.newQuery(MConstraint.class, "childIntegerIndex < 0"));
+       initQueries.add(pm.newQuery(MNotificationLog.class, "dbName == ''"));
+       initQueries.add(pm.newQuery(MNotificationNextId.class, "nextEventId < -1"));
+       initQueries.add(pm.newQuery(MWMResourcePlan.class, "name == ''"));
+       initQueries.add(pm.newQuery(MCreationMetadata.class, "dbName == ''"));
+       initQueries.add(pm.newQuery(MPartitionPrivilege.class, "principalName == ''"));
+       initQueries.add(pm.newQuery(MPartitionColumnPrivilege.class, "principalName == ''"));
+       Query q;
+       while ((q = initQueries.peekFirst()) != null) {
+         q.execute();
+         initQueries.pollFirst();
+       }
+ 
+       return true;
+     } catch (Exception ex) {
+       doCommit = false;
+       LOG.warn("Database initialization failed; direct SQL is disabled", ex);
+       tx.rollback();
+       return false;
+     } finally {
+       if (doCommit) {
+         tx.commit();
+       }
+       for (Query q : initQueries) {
+         try {
+           q.closeAll();
+         } catch (Throwable t) {
+         }
+       }
+     }
+   }
+ 
+   private boolean runTestQuery() {
+     Transaction tx = pm.currentTransaction();
+     boolean doCommit = false;
+     if (!tx.isActive()) {
+       tx.begin();
+       doCommit = true;
+     }
+     Query query = null;
+     // Run a self-test query. If it doesn't work, we will self-disable. What a PITA...
+     String selfTestQuery = "select \"DB_ID\" from " + DBS + "";
+     try {
+       prepareTxn();
+       query = pm.newQuery("javax.jdo.query.SQL", selfTestQuery);
+       query.execute();
+       return true;
+     } catch (Throwable t) {
+       doCommit = false;
+       LOG.warn("Self-test query [" + selfTestQuery + "] failed; direct SQL is disabled", t);
+       tx.rollback();
+       return false;
+     } finally {
+       if (doCommit) {
+         tx.commit();
+       }
+       if (query != null) {
+         query.closeAll();
+       }
+     }
+   }
+ 
+   public String getSchema() {
+     return schema;
+   }
+ 
+   public boolean isCompatibleDatastore() {
+     return isCompatibleDatastore;
+   }
+ 
+   private void executeNoResult(final String queryText) throws SQLException {
+     JDOConnection jdoConn = pm.getDataStoreConnection();
+     Statement statement = null;
+     boolean doTrace = LOG.isDebugEnabled();
+     try {
+       long start = doTrace ? System.nanoTime() : 0;
+       statement = ((Connection)jdoConn.getNativeConnection()).createStatement();
+       statement.execute(queryText);
+       timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
+     } finally {
+       if(statement != null){
+           statement.close();
+       }
+       jdoConn.close(); // We must release the connection before we call other pm methods.
+     }
+   }
+ 
+   public Database getDatabase(String catName, String dbName) throws MetaException{
+     Query queryDbSelector = null;
+     Query queryDbParams = null;
+     try {
+       dbName = dbName.toLowerCase();
+       catName = catName.toLowerCase();
+ 
+       String queryTextDbSelector= "select "
+           + "\"DB_ID\", \"NAME\", \"DB_LOCATION_URI\", \"DESC\", "
+           + "\"OWNER_NAME\", \"OWNER_TYPE\", \"CTLG_NAME\" "
+           + "FROM "+ DBS
+           + " where \"NAME\" = ? and \"CTLG_NAME\" = ? ";
+       Object[] params = new Object[] { dbName, catName };
+       queryDbSelector = pm.newQuery("javax.jdo.query.SQL", queryTextDbSelector);
+ 
+       if (LOG.isTraceEnabled()) {
+         LOG.trace("getDatabase:query instantiated : " + queryTextDbSelector
+             + " with param [" + params[0] + "]");
+       }
+ 
+       List<Object[]> sqlResult = executeWithArray(
+           queryDbSelector, params, queryTextDbSelector);
+       if ((sqlResult == null) || sqlResult.isEmpty()) {
+         return null;
+       }
+ 
+       assert(sqlResult.size() == 1);
+       if (sqlResult.get(0) == null) {
+         return null;
+       }
+ 
+       Object[] dbline = sqlResult.get(0);
+       Long dbid = extractSqlLong(dbline[0]);
+ 
+       String queryTextDbParams = "select \"PARAM_KEY\", \"PARAM_VALUE\" "
+           + " from " + DATABASE_PARAMS + " "
+           + " WHERE \"DB_ID\" = ? "
+           + " AND \"PARAM_KEY\" IS NOT NULL";
+       params[0] = dbid;
+       queryDbParams = pm.newQuery("javax.jdo.query.SQL", queryTextDbParams);
+       if (LOG.isTraceEnabled()) {
+         LOG.trace("getDatabase:query2 instantiated : " + queryTextDbParams
+             + " with param [" + params[0] + "]");
+       }
+ 
+       Map<String,String> dbParams = new HashMap<String,String>();
+       List<Object[]> sqlResult2 = ensureList(executeWithArray(
+           queryDbParams, params, queryTextDbParams));
+       if (!sqlResult2.isEmpty()) {
+         for (Object[] line : sqlResult2) {
+           dbParams.put(extractSqlString(line[0]), extractSqlString(line[1]));
+         }
+       }
+       Database db = new Database();
+       db.setName(extractSqlString(dbline[1]));
+       db.setLocationUri(extractSqlString(dbline[2]));
+       db.setDescription(extractSqlString(dbline[3]));
+       db.setOwnerName(extractSqlString(dbline[4]));
+       String type = extractSqlString(dbline[5]);
+       db.setOwnerType(
+           (null == type || type.trim().isEmpty()) ? null : PrincipalType.valueOf(type));
+       db.setCatalogName(extractSqlString(dbline[6]));
+       db.setParameters(MetaStoreUtils.trimMapNulls(dbParams,convertMapNullsToEmptyStrings));
+       if (LOG.isDebugEnabled()){
+         LOG.debug("getDatabase: directsql returning db " + db.getName()
+             + " locn["+db.getLocationUri()  +"] desc [" +db.getDescription()
+             + "] owner [" + db.getOwnerName() + "] ownertype ["+ db.getOwnerType() +"]");
+       }
+       return db;
+     } finally {
+       if (queryDbSelector != null){
+         queryDbSelector.closeAll();
+       }
+       if (queryDbParams != null){
+         queryDbParams.closeAll();
+       }
+     }
+   }
+ 
+   /**
+    * Get table names by using direct SQL queries.
+    * @param catName catalog name
+    * @param dbName Metastore database namme
+    * @param tableType Table type, or null if we want to get all tables
+    * @return list of table names
+    */
+   public List<String> getTables(String catName, String dbName, TableType tableType)
+       throws MetaException {
+     String queryText = "SELECT " + TBLS + ".\"TBL_NAME\""
+       + " FROM " + TBLS + " "
+       + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+       + " WHERE " + DBS + ".\"NAME\" = ? AND " + DBS + ".\"CTLG_NAME\" = ? "
+       + (tableType == null ? "" : "AND " + TBLS + ".\"TBL_TYPE\" = ? ") ;
+ 
+     List<String> pms = new ArrayList<>();
+     pms.add(dbName);
+     pms.add(catName);
+     if (tableType != null) {
+       pms.add(tableType.toString());
+     }
+ 
+     Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
+     return executeWithArray(
+         queryParams, pms.toArray(), queryText);
+   }
+ 
+   /**
+    * Get table names by using direct SQL queries.
+    *
+    * @param dbName Metastore database namme
+    * @return list of table names
+    */
+   public List<String> getMaterializedViewsForRewriting(String dbName) throws MetaException {
+     String queryText = "SELECT " + TBLS + ".\"TBL_NAME\""
+       + " FROM " + TBLS + " "
+       + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+       + " WHERE " + DBS + ".\"NAME\" = ? AND " + TBLS + ".\"TBL_TYPE\" = ? " ;
+ 
+     List<String> pms = new ArrayList<String>();
+     pms.add(dbName);
+     pms.add(TableType.MATERIALIZED_VIEW.toString());
+ 
+     Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
+     return executeWithArray(
+         queryParams, pms.toArray(), queryText);
+   }
+ 
+   /**
+    * Gets partitions by using direct SQL queries.
+    * @param catName Metastore catalog name.
+    * @param dbName Metastore db name.
+    * @param tblName Metastore table name.
+    * @param partNames Partition names to get.
+    * @return List of partitions.
+    */
+   public List<Partition> getPartitionsViaSqlFilter(final String catName, final String dbName,
+                                                    final String tblName, List<String> partNames)
+       throws MetaException {
+     if (partNames.isEmpty()) {
+       return Collections.emptyList();
+     }
+     return Batchable.runBatched(batchSize, partNames, new Batchable<String, Partition>() {
+       @Override
+       public List<Partition> run(List<String> input) throws MetaException {
+         String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
+         List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
+             filter, input, Collections.<String>emptyList(), null);
+         if (partitionIds.isEmpty()) {
+           return Collections.emptyList(); // no partitions, bail early.
+         }
+         return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds);
+       }
+     });
+   }
+ 
+   /**
+    * Gets partitions by using direct SQL queries.
+    * @param filter The filter.
+    * @param max The maximum number of partitions to return.
+    * @return List of partitions.
+    */
+   public List<Partition> getPartitionsViaSqlFilter(
+       SqlFilterForPushdown filter, Integer max) throws MetaException {
+     Boolean isViewTable = isViewTable(filter.table);
+     String catName = filter.table.isSetCatName() ? filter.table.getCatName() :
+         DEFAULT_CATALOG_NAME;
+     List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName,
+         filter.table.getDbName(), filter.table.getTableName(), filter.filter, filter.params,
+         filter.joins, max);
+     if (partitionIds.isEmpty()) {
+       return Collections.emptyList(); // no partitions, bail early.
+     }
+     return Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() {
+       @Override
+       public List<Partition> run(List<Object> input) throws MetaException {
+         return getPartitionsFromPartitionIds(catName, filter.table.getDbName(),
+             filter.table.getTableName(), isViewTable, input);
+       }
+     });
+   }
+ 
+   public static class SqlFilterForPushdown {
+     private final List<Object> params = new ArrayList<>();
+     private final List<String> joins = new ArrayList<>();
+     private String filter;
+     private Table table;
+   }
+ 
+   public boolean generateSqlFilterForPushdown(
+       Table table, ExpressionTree tree, SqlFilterForPushdown result) throws MetaException {
+     // Derby and Oracle do not interpret filters ANSI-properly in some cases and need a workaround.
+     boolean dbHasJoinCastBug = DatabaseProduct.hasJoinOperationOrderBug(dbType);
+     result.table = table;
+     result.filter = PartitionFilterGenerator.generateSqlFilter(table, tree, result.params,
+         result.joins, dbHasJoinCastBug, defaultPartName, dbType, schema);
+     return result.filter != null;
+   }
+ 
+   /**
+    * Gets all partitions of a table by using direct SQL queries.
+    * @param catName Metastore catalog name.
+    * @param dbName Metastore db name.
+    * @param tblName Metastore table name.
+    * @param max The maximum number of partitions to return.
+    * @return List of partitions.
+    */
+   public List<Partition> getPartitions(String catName,
+       String dbName, String tblName, Integer max) throws MetaException {
+     List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName,
+         tblName, null, Collections.<String>emptyList(), Collections.<String>emptyList(), max);
+     if (partitionIds.isEmpty()) {
+       return Collections.emptyList(); // no partitions, bail early.
+     }
+ 
+     // Get full objects. For Oracle/etc. do it in batches.
+     List<Partition> result = Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() {
+       @Override
+       public List<Partition> run(List<Object> input) throws MetaException {
+         return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input);
+       }
+     });
+     return result;
+   }
+ 
+   private static Boolean isViewTable(Table t) {
+     return t.isSetTableType() ?
+         t.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) : null;
+   }
+ 
+   private boolean isViewTable(String catName, String dbName, String tblName) throws MetaException {
+     Query query = null;
+     try {
+       String queryText = "select \"TBL_TYPE\" from " + TBLS + "" +
+           " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " +
+           " where " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + DBS + ".\"CTLG_NAME\" = ?";
+       Object[] params = new Object[] { tblName, dbName, catName };
+       query = pm.newQuery("javax.jdo.query.SQL", queryText);
+       query.setUnique(true);
+       Object result = executeWithArray(query, params, queryText);
+       return (result != null) && result.toString().equals(TableType.VIRTUAL_VIEW.toString());
+     } finally {
+       if (query != null) {
+         query.closeAll();
+       }
+     }
+   }
+ 
+   /**
+    * Get partition ids for the query using direct SQL queries, to avoid bazillion
+    * queries created by DN retrieving stuff for each object individually.
+    * @param catName MetaStore catalog name
+    * @param dbName MetaStore db name
+    * @param tblName MetaStore table name
+    * @param sqlFilter SQL filter to use. Better be SQL92-compliant.
+    * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order.
+    * @param joinsForFilter if the filter needs additional join statement, they must be in
+    *                       this list. Better be SQL92-compliant.
+    * @param max The maximum number of partitions to return.
+    * @return List of partition objects.
+    */
+   private List<Object> getPartitionIdsViaSqlFilter(
+       String catName, String dbName, String tblName, String sqlFilter,
+       List<? extends Object> paramsForFilter, List<String> joinsForFilter, Integer max)
+       throws MetaException {
+     boolean doTrace = LOG.isDebugEnabled();
+     final String dbNameLcase = dbName.toLowerCase();
+     final String tblNameLcase = tblName.toLowerCase();
+     final String catNameLcase = normalizeSpace(catName).toLowerCase();
+ 
+     // We have to be mindful of order during filtering if we are not returning all partitions.
+     String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : "";
+ 
+     String queryText =
+         "select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + ""
+       + "  inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" "
+       + "    and " + TBLS + ".\"TBL_NAME\" = ? "
+       + "  inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+       + "     and " + DBS + ".\"NAME\" = ? "
+       + join(joinsForFilter, ' ')
+       + " where " + DBS + ".\"CTLG_NAME\" = ? "
+       + (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) + orderForFilter;
+     Object[] params = new Object[paramsForFilter.size() + 3];
+     params[0] = tblNameLcase;
+     params[1] = dbNameLcase;
+     params[2] = catNameLcase;
+     for (int i = 0; i < paramsForFilter.size(); ++i) {
+       params[i + 3] = paramsForFilter.get(i);
+     }
+ 
+     long start = doTrace ? System.nanoTime() : 0;
+     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+     if (max != null) {
+       query.setRange(0, max.shortValue());
+     }
+     List<Object> sqlResult = executeWithArray(query, params, queryText);
+     long queryTime = doTrace ? System.nanoTime() : 0;
+     timingTrace(doTrace, queryText, start, queryTime);
+     if (sqlResult.isEmpty()) {
+       return Collections.emptyList(); // no partitions, bail early.
+     }
+ 
+     List<Object> result = new ArrayList<Object>(sqlResult.size());
+     for (Object fields : sqlResult) {
+       result.add(extractSqlLong(fields));
+     }
+     query.closeAll();
+     return result;
+   }
+ 
+   /** Should be called with the list short enough to not trip up Oracle/etc. */
+   private List<Partition> getPartitionsFromPartitionIds(String catName, String dbName, String tblName,
+       Boolean isView, List<Object> partIdList) throws MetaException {
+     boolean doTrace = LOG.isDebugEnabled();
+ 
+     int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma
+     int sbCapacity = partIdList.size() * idStringWidth;
+ 
+     String partIds = getIdListForIn(partIdList);
+ 
+     // Get most of the fields for the IDs provided.
+     // Assume db and table names are the same for all partition, as provided in arguments.
+     String queryText =
+       "select " + PARTITIONS + ".\"PART_ID\", " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\","
+     + " " + SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\","
+     + " " + PARTITIONS + ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\","
+     + " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS + ".\"NUM_BUCKETS\","
 -    + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\" "
 -    + "from " + PARTITIONS + ""
++    + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\", " + PARTITIONS
++    + ".\"WRITE_ID\"" + " from " + PARTITIONS + ""
+     + "  left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" "
+     + "  left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" "
+     + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc";
+     long start = doTrace ? System.nanoTime() : 0;
+     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+     List<Object[]> sqlResult = executeWithArray(query, null, queryText);
+     long queryTime = doTrace ? System.nanoTime() : 0;
+     Deadline.checkTimeout();
+ 
+     // Read all the fields and create partitions, SDs and serdes.
+     TreeMap<Long, Partition> partitions = new TreeMap<Long, Partition>();
+     TreeMap<Long, StorageDescriptor> sds = new TreeMap<Long, StorageDescriptor>();
+     TreeMap<Long, SerDeInfo> serdes = new TreeMap<Long, SerDeInfo>();
+     TreeMap<Long, List<FieldSchema>> colss = new TreeMap<Long, List<FieldSchema>>();
+     // Keep order by name, consistent with JDO.
+     ArrayList<Partition> orderedResult = new ArrayList<Partition>(partIdList.size());
+ 
+     // Prepare StringBuilder-s for "in (...)" lists to use in one-to-many queries.
+     StringBuilder sdSb = new StringBuilder(sbCapacity), serdeSb = new StringBuilder(sbCapacity);
+     StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema.
+     tblName = tblName.toLowerCase();
+     dbName = dbName.toLowerCase();
+     catName = normalizeSpace(catName).toLowerCase();
+     for (Object[] fields : sqlResult) {
+       // Here comes the ugly part...
+       long partitionId = extractSqlLong(fields[0]);
+       Long sdId = extractSqlLong(fields[1]);
+       Long colId = extractSqlLong(fields[2]);
+       Long serdeId = extractSqlLong(fields[3]);
+       // A partition must have at least sdId and serdeId set, or nothing set if it's a view.
+       if (sdId == null || serdeId == null) {
+         if (isView == null) {
+           isView = isViewTable(catName, dbName, tblName);
+         }
+         if ((sdId != null || colId != null || serdeId != null) || !isView) {
+           throw new MetaException("Unexpected null for one of the IDs, SD " + sdId +
+                   ", serde " + serdeId + " for a " + (isView ? "" : "non-") + " view");
+         }
+       }
+ 
+       Partition part = new Partition();
+       orderedResult.add(part);
+       // Set the collection fields; some code might not check presence before accessing them.
+       part.setParameters(new HashMap<>());
+       part.setValues(new ArrayList<String>());
+       part.setCatName(catName);
+       part.setDbName(dbName);
+       part.setTableName(tblName);
+       if (fields[4] != null) part.setCreateTime(extractSqlInt(fields[4]));
+       if (fields[5] != null) part.setLastAccessTime(extractSqlInt(fields[5]));
++      Long writeId = extractSqlLong(fields[14]);
++      if (writeId != null) {
++        part.setWriteId(writeId);
++      }
+       partitions.put(partitionId, part);
+ 
++
+       if (sdId == null) continue; // Probably a view.
+       assert serdeId != null;
+ 
+       // We assume each partition has an unique SD.
+       StorageDescriptor sd = new StorageDescriptor();
+       StorageDescriptor oldSd = sds.put(sdId, sd);
+       if (oldSd != null) {
+         throw new MetaException("Partitions reuse SDs; we don't expect that");
+       }
+       // Set the collection fields; some code might not check presence before accessing them.
+       sd.setSortCols(new ArrayList<Order>());
+       sd.setBucketCols(new ArrayList<String>());
+       sd.setParameters(new HashMap<String, String>());
+       sd.setSkewedInfo(new SkewedInfo(new ArrayList<String>(),
+           new ArrayList<List<String>>(), new HashMap<List<String>, String>()));
+       sd.setInputFormat((String)fields[6]);
+       Boolean tmpBoolean = extractSqlBoolean(fields[7]);
+       if (tmpBoolean != null) sd.setCompressed(tmpBoolean);
+       tmpBoolean = extractSqlBoolean(fields[8]);
+       if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean);
+       sd.setLocation((String)fields[9]);
+       if (fields[10] != null) sd.setNumBuckets(extractSqlInt(fields[10]));
+       sd.setOutputFormat((String)fields[11]);
+       sdSb.append(sdId).append(",");
+       part.setSd(sd);
+ 
+       if (colId != null) {
+         List<FieldSchema> cols = colss.get(colId);
+         // We expect that colId will be the same for all (or many) SDs.
+         if (cols == null) {
+           cols = new ArrayList<FieldSchema>();
+           colss.put(colId, cols);
+           colsSb.append(colId).append(",");
+         }
+         sd.setCols(cols);
+       }
+ 
+       // We assume each SD has an unique serde.
+       SerDeInfo serde = new SerDeInfo();
+       SerDeInfo oldSerde = serdes.put(serdeId, serde);
+       if (oldSerde != null) {
+         throw new MetaException("SDs reuse serdes; we don't expect that");
+       }
+       serde.setParameters(new HashMap<String, String>());
+       serde.setName((String)fields[12]);
+       serde.setSerializationLib((String)fields[13]);
+       serdeSb.append(serdeId).append(",");
+       sd.setSerdeInfo(serde);
++
+       Deadline.checkTimeout();
+     }
+     query.closeAll();
+     timingTrace(doTrace, queryText, start, queryTime);
+ 
+     // Now get all the one-to-many things. Start with partitions.
+     queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + PARTITION_PARAMS + ""
+         + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null"
+         + " order by \"PART_ID\" asc";
+     loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
+       @Override
+       public void apply(Partition t, Object[] fields) {
+         t.putToParameters((String)fields[1], (String)fields[2]);
+       }});
+     // Perform conversion of null map values
+     for (Partition t : partitions.values()) {
+       t.setParameters(MetaStoreUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+     }
+ 
+     queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from " + PARTITION_KEY_VALS + ""
+         + " where \"PART_ID\" in (" + partIds + ")"
+         + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc";
+     loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
+       @Override
+       public void apply(Partition t, Object[] fields) {
+         t.addToValues((String)fields[1]);
+       }});
+ 
+     // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
+     if (sdSb.length() == 0) {
+       assert serdeSb.length() == 0 && colsSb.length() == 0;
+       return orderedResult; // No SDs, probably a view.
+     }
+ 
+     String sdIds = trimCommaList(sdSb);
+     String serdeIds = trimCommaList(serdeSb);
+     String colIds = trimCommaList(colsSb);
+ 
+     // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
+     queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SD_PARAMS + ""
+         + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null"
+         + " order by \"SD_ID\" asc";
+     loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+       @Override
+       public void apply(StorageDescriptor t, Object[] fields) {
+         t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
+       }});
+     // Perform conversion of null map values
+     for (StorageDescriptor t : sds.values()) {
+       t.setParameters(MetaStoreUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+     }
+ 
+     queryText = "select \"SD_ID\", \"COLUMN_NAME\", " + SORT_COLS + ".\"ORDER\""
+         + " from " + SORT_COLS + ""
+         + " where \"SD_ID\" in (" + sdIds + ")"
+         + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+     loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+       @Override
+       public void apply(StorageDescriptor t, Object[] fields) {
+         if (fields[2] == null) return;
+         t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2])));
+       }});
+ 
+     queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from " + BUCKETING_COLS + ""
+         + " where \"SD_ID\" in (" + sdIds + ")"
+         + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+     loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+       @Override
+       public void apply(StorageDescriptor t, Object[] fields) {
+         t.addToBucketCols((String)fields[1]);
+       }});
+ 
+     // Skewed columns stuff.
+     queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from " + SKEWED_COL_NAMES + ""
+         + " where \"SD_ID\" in (" + sdIds + ")"
+         + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+     boolean hasSkewedColumns =
+       loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+         @Override
+         public void apply(StorageDescriptor t, Object[] fields) {
+           if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+           t.getSkewedInfo().addToSkewedColNames((String)fields[1]);
+         }}) > 0;
+ 
+     // Assume we don't need to fetch the rest of the skewed column data if we have no columns.
+     if (hasSkewedColumns) {
+       // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
+       queryText =
+             "select " + SKEWED_VALUES + ".\"SD_ID_OID\","
+           + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\","
+           + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
+           + "from " + SKEWED_VALUES + " "
+           + "  left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_VALUES + "."
+           + "\"STRING_LIST_ID_EID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
+           + "where " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ") "
+           + "  and " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" is not null "
+           + "  and " + SKEWED_VALUES + ".\"INTEGER_IDX\" >= 0 "
+           + "order by " + SKEWED_VALUES + ".\"SD_ID_OID\" asc, " + SKEWED_VALUES + ".\"INTEGER_IDX\" asc,"
+           + "  " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
+       loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+         private Long currentListId;
+         private List<String> currentList;
+         @Override
+         public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
+           if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+           // Note that this is not a typical list accumulator - there's no call to finalize
+           // the last list. Instead we add list to SD first, as well as locally to add elements.
+           if (fields[1] == null) {
+             currentList = null; // left outer join produced a list with no values
+             currentListId = null;
+             t.getSkewedInfo().addToSkewedColValues(Collections.<String>emptyList());
+           } else {
+             long fieldsListId = extractSqlLong(fields[1]);
+             if (currentListId == null || fieldsListId != currentListId) {
+               currentList = new ArrayList<String>();
+               currentListId = fieldsListId;
+               t.getSkewedInfo().addToSkewedColValues(currentList);
+             }
+             currentList.add((String)fields[2]);
+           }
+         }});
+ 
+       // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
+       queryText =
+             "select " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\","
+           + " " + SKEWED_STRING_LIST_VALUES + ".STRING_LIST_ID,"
+           + " " + SKEWED_COL_VALUE_LOC_MAP + ".\"LOCATION\","
+           + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
+           + "from " + SKEWED_COL_VALUE_LOC_MAP + ""
+           + "  left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_COL_VALUE_LOC_MAP + "."
+           + "\"STRING_LIST_ID_KID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
+           + "where " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" in (" + sdIds + ")"
+           + "  and " + SKEWED_COL_VALUE_LOC_MAP + ".\"STRING_LIST_ID_KID\" is not null "
+           + "order by " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" asc,"
+           + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" asc,"
+           + "  " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
+ 
+       loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+         private Long currentListId;
+         private List<String> currentList;
+         @Override
+         public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
+           if (!t.isSetSkewedInfo()) {
+             SkewedInfo skewedInfo = new SkewedInfo();
+             skewedInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
+             t.setSkewedInfo(skewedInfo);
+           }
+           Map<List<String>, String> skewMap = t.getSkewedInfo().getSkewedColValueLocationMaps();
+           // Note that this is not a typical list accumulator - there's no call to finalize
+           // the last list. Instead we add list to SD first, as well as locally to add elements.
+           if (fields[1] == null) {
+             currentList = new ArrayList<String>(); // left outer join produced a list with no values
+             currentListId = null;
+           } else {
+             long fieldsListId = extractSqlLong(fields[1]);
+             if (currentListId == null || fieldsListId != currentListId) {
+               currentList = new ArrayList<String>();
+               currentListId = fieldsListId;
+             } else {
+               skewMap.remove(currentList); // value based compare.. remove first
+             }
+             currentList.add((String)fields[3]);
+           }
+           skewMap.put(currentList, (String)fields[2]);
+         }});
+     } // if (hasSkewedColumns)
+ 
+     // Get FieldSchema stuff if any.
+     if (!colss.isEmpty()) {
+       // We are skipping the CDS table here, as it seems to be totally useless.
+       queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\""
+           + " from " + COLUMNS_V2 + " where \"CD_ID\" in (" + colIds + ")"
+           + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc";
+       loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
+         @Override
+         public void apply(List<FieldSchema> t, Object[] fields) {
+           t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1]));
+         }});
+     }
+ 
+     // Finally, get all the stuff for serdes - just the params.
+     queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SERDE_PARAMS + ""
+         + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null"
+         + " order by \"SERDE_ID\" asc";
+     loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
+       @Override
+       public void apply(SerDeInfo t, Object[] fields) {
+         t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
+       }});
+     // Perform conversion of null map values
+     for (SerDeInfo t : serdes.values()) {
+       t.setParameters(MetaStoreUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+     }
+ 
+     return orderedResult;
+   }
+ 
+   public int getNumPartitionsViaSqlFilter(SqlFilterForPushdown filter) throws MetaException {
+     boolean doTrace = LOG.isDebugEnabled();
+     String catName = filter.table.getCatName().toLowerCase();
+     String dbName = filter.table.getDbName().toLowerCase();
+     String tblName = filter.table.getTableName().toLowerCase();
+ 
+     // Get number of partitions by doing count on PART_ID.
+     String queryText = "select count(" + PARTITIONS + ".\"PART_ID\") from " + PARTITIONS + ""
+       + "  inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" "
+       + "    and " + TBLS + ".\"TBL_NAME\" = ? "
+       + "  inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+       + "     and " + DBS + ".\"NAME\" = ? "
+       + join(filter.joins, ' ')
+       + " where " + DBS + ".\"CTLG_NAME\" = ? "
+       + (filter.filter == null || filter.filter.trim().isEmpty() ? "" : (" and " + filter.filter));
+ 
+     Object[] params = new Object[filter.params.size() + 3];
+     params[0] = tblName;
+     params[1] = dbName;
+     params[2] = catName;
+     for (int i = 0; i < filter.params.size(); ++i) {
+       params[i + 3] = filter.params.get(i);
+     }
+ 
+     long start = doTrace ? System.nanoTime() : 0;
+     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+     query.setUnique(true);
+     int sqlResult = extractSqlInt(query.executeWithArray(params));
+     long queryTime = doTrace ? System.nanoTime() : 0;
+     timingTrace(doTrace, queryText, start, queryTime);
+     return sqlResult;
+   }
+ 
+ 
+   private void timingTrace(boolean doTrace, String queryText, long start, long queryTime) {
+     if (!doTrace) return;
+     LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+         (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
+   }
+ 
+   static Long extractSqlLong(Object obj) throws MetaException {
+     if (obj == null) return null;
+     if (!(obj instanceof Number)) {
+       throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
+     }
+     return ((Number)obj).longValue();
+   }
+ 
+   /**
+    * Convert a boolean value returned from the RDBMS to a Java Boolean object.
+    * MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping.
+    *
+    * @param value
+    *          column value from the database
+    * @return The Boolean value of the database column value, null if the column
+    *         value is null
+    * @throws MetaException
+    *           if the column value cannot be converted into a Boolean object
+    */
+   private static Boolean extractSqlBoolean(Object value) throws MetaException {
+     if (value == null) {
+       return null;
+     }
+     if (value instanceof Boolean) {
+       return (Boolean)value;
+     }
+     if (value instanceof String) {
+       try {
+         return BooleanUtils.toBooleanObject((String) value, "Y", "N", null);
+       } catch (IllegalArgumentException iae) {
+         // NOOP
+       }
+     }
+     throw new MetaException("Cannot extract boolean from column value " + value);
+   }
+ 
+   private int extractSqlInt(Object field) {
+     return ((Number)field).intValue();
+   }
+ 
+   private String extractSqlString(Object value) {
+     if (value == null) return null;
+     return value.toString();
+   }
+ 
+   static Double extractSqlDouble(Object obj) throws MetaException {
+     if (obj == null)
+       return null;
+     if (!(obj instanceof Number)) {
+       throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
+     }
+     return ((Number) obj).doubleValue();
+   }
+ 
+   private String extractSqlClob(Object value) {
+     if (value == null) return null;
+     try {
+       if (value instanceof Clob) {
+         // we trim the Clob value to a max length an int can hold
+         int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2;
+         return ((Clob)value).getSubString(1L, maxLength);
+       } else {
+         return value.toString();
+       }
+     } catch (SQLException sqle) {
+       return null;
+     }
+   }
+ 
+   static byte[] extractSqlBlob(Object value) throws MetaException {
+     if (value == null)
+       return null;
+     if (value instanceof Blob) {
+       //derby, oracle
+       try {
+         // getBytes function says: pos the ordinal position of the first byte in
+         // the BLOB value to be extracted; the first byte is at position 1
+         return ((Blob) value).getBytes(1, (int) ((Blob) value).length());
+       } catch (SQLException e) {
+         throw new MetaException("Encounter error while processing blob.");
+       }
+     }
+     else if (value instanceof byte[]) {
+       // mysql, postgres, sql server
+       return (byte[]) value;
+     }
+ 	else {
+       // this may happen when enablebitvector is false
+       LOG.debug("Expected blob type but got " + value.getClass().getName());
+       return null;
+     }
+   }
+ 
+   /**
+    * Helper method for preparing for "SOMETHING_ID in (...)" to use in future queries.
+    * @param objectIds the objectId collection
+    * @return The concatenated list
+    * @throws MetaException If the list contains wrong data
+    */
+   private static String getIdListForIn(List<Object> objectIds) throws MetaException {
+     return objectIds.stream()
+                .map(i -> i.toString())
+                .collect(Collectors.joining(","));
+   }
+ 
+   private static String trimCommaList(StringBuilder sb) {
+     if (sb.length() > 0) {
+       sb.setLength(sb.length() - 1);
+     }
+     return sb.toString();
+   }
+ 
+   private abstract class ApplyFunc<Target> {
+     public abstract void apply(Target t, Object[] fields) throws MetaException;
+   }
+ 
+   /**
+    * Merges applies the result of a PM SQL query into a tree of object.
+    * Essentially it's an object join. DN could do this for us, but it issues queries
+    * separately for every object, which is suboptimal.
+    * @param tree The object tree, by ID.
+    * @param queryText The query text.
+    * @param keyIndex Index of the Long column corresponding to the map ID in query result rows.
+    * @param func The function that is called on each (object,row) pair with the same id.
+    * @return the count of results returned from the query.
+    */
+   private <T> int loopJoinOrderedResult(TreeMap<Long, T> tree,
+       String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException {
+     boolean doTrace = LOG.isDebugEnabled();
+     long start = doTrace ? System.nanoTime() : 0;
+     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+     Object result = query.execute();
+     long queryTime = doTrace ? System.nanoTime() : 0;
+     if (result == null) {
+       query.closeAll();
+       return 0;
+     }
+     List<Object[]> list = ensureList(result);
+     Iterator<Object[]> iter = list.iterator();
+     Object[] fields = null;
+     for (Map.Entry<Long, T> entry : tree.entrySet()) {
+       if (fields == null && !iter.hasNext()) break;
+       long id = entry.getKey();
+       while (fields != null || iter.hasNext()) {
+         if (fields == null) {
+           fields = iter.next();
+         }
+         long nestedId = extractSqlLong(fields[keyIndex]);
+         if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId);
+         if (nestedId > id) break; // fields belong to one of the next entries
+         func.apply(entry.getValue(), fields);
+         fields = null;
+       }
+       Deadline.checkTimeout();
+     }
+     int rv = list.size();
+     query.closeAll();
+     timingTrace(doTrace, queryText, start, queryTime);
+     return rv;
+   }
+ 
+   private static class PartitionFilterGenerator extends TreeVisitor {
+     private final Table table;
+     private final FilterBuilder filterBuffer;
+     private final List<Object> params;
+     private final List<String> joins;
+     private final boolean dbHasJoinCastBug;
+     private final String defaultPartName;
+     private final DatabaseProduct dbType;
+     private final String PARTITION_KEY_VALS, PARTITIONS, DBS, TBLS;
+ 
+     private PartitionFilterGenerator(Table table, List<Object> params, List<String> joins,
+         boolean dbHasJoinCastBug, String defaultPartName, DatabaseProduct dbType, String schema) {
+       this.table = table;
+       this.params = params;
+       this.joins = joins;
+       this.dbHasJoinCastBug = dbHasJoinCastBug;
+       this.filterBuffer = new FilterBuilder(false);
+       this.defaultPartName = defaultPartName;
+       this.dbType = dbType;
+       this.PARTITION_KEY_VALS = getFullyQualifiedName(schema, "PARTITION_KEY_VALS");
+       this.PARTITIONS = getFullyQualifiedName(schema, "PARTITIONS");
+       this.DBS = getFullyQualifiedName(schema, "DBS");
+       this.TBLS = getFullyQualifiedName(schema, "TBLS");
+     }
+ 
+     /**
+      * Generate the ANSI SQL92 filter for the given expression tree
+      * @param table the table being queried
+      * @param params the ordered parameters for the resulting expression
+      * @param joins the joins necessary for the resulting expression
+      * @return the string representation of the expression tree
+      */
+     private static String generateSqlFilter(Table table, ExpressionTree tree, List<Object> params,
+         List<String> joins, boolean dbHasJoinCastBug, String defaultPartName,
+         DatabaseProduct dbType, String schema) throws MetaException {
+       assert table != null;
+       if (tree == null) {
+         // consistent with other APIs like makeExpressionTree, null is returned to indicate that
+         // the filter could not pushed down due to parsing issue etc
+         return null;
+       }
+       if (tree.getRoot() == null) {
+         return "";
+       }
+       PartitionFilterGenerator visitor = new PartitionFilterGenerator(
+           table, params, joins, dbHasJoinCastBug, defaultPartName, dbType, schema);
+       tree.accept(visitor);
+       if (visitor.filterBuffer.hasError()) {
+         LOG.info("Unable to push down SQL filter: " + visitor.filterBuffer.getErrorMessage());
+         return null;
+       }
+ 
+       // Some joins might be null (see processNode for LeafNode), clean them up.
+       for (int i = 0; i < joins.size(); ++i) {
+         if (joins.get(i) != null) continue;
+         joins.remove(i--);
+       }
+       return "(" + visitor.filterBuffer.getFilter() + ")";
+     }
+ 
+     @Override
+     protected void beginTreeNode(TreeNode node) throws MetaException {
+       filterBuffer.append(" (");
+     }
+ 
+     @Override
+     protected void midTreeNode(TreeNode node) throws MetaException {
+       filterBuffer.append((node.getAndOr() == LogicalOperator.AND) ? " and " : " or ");
+     }
+ 
+     @Override
+     protected void endTreeNode(TreeNode node) throws MetaException {
+       filterBuffer.append(") ");
+     }
+ 
+     @Override
+     protected boolean shouldStop() {
+       return filterBuffer.hasError();
+     }
+ 
+     private static enum FilterType {
+       Integral,
+       String,
+       Date,
+ 
+       Invalid;
+ 
+       static FilterType fromType(String colTypeStr) {
+         if (colTypeStr.equals(ColumnType.STRING_TYPE_NAME)) {
+           return FilterType.String;
+         } else if (colTypeStr.equals(ColumnType.DATE_TYPE_NAME)) {
+           return FilterType.Date;
+         } else if (ColumnType.IntegralTypes.contains(colTypeStr)) {
+           return FilterType.Integral;
+         }
+         return FilterType.Invalid;
+       }
+ 
+       public static FilterType fromClass(Object value) {
+         if (value instanceof String) {
+           return FilterType.String;
+         } else if (value instanceof Long) {
+           return FilterType.Integral;
+         } else if (value instanceof java.sql.Date) {
+           return FilterType.Date;
+         }
+         return FilterType.Invalid;
+       }
+     }
+ 
+     @Override
+     public void visit(LeafNode node) throws MetaException {
+       if (node.operator == Operator.LIKE) {
+         filterBuffer.setError("LIKE is not supported for SQL filter pushdown");
+         return;
+       }
+       int partColCount = table.getPartitionKeys().size();
+       int partColIndex = node.getPartColIndexForFilter(table, filterBuffer);
+       if (filterBuffer.hasError()) return;
+ 
+       // We skipped 'like', other ops should all work as long as the types are right.
+       String colTypeStr = table.getPartitionKeys().get(partColIndex).getType();
+       FilterType colType = FilterType.fromType(colTypeStr);
+       if (colType == FilterType.Invalid) {
+         filterBuffer.setError("Filter pushdown not supported for type " + colTypeStr);
+         return;
+       }
+       FilterType valType = FilterType.fromClass(node.value);
+       Object nodeValue = node.value;
+       if (valType == FilterType.Invalid) {
+         filterBuffer.setError("Filter pushdown not supported for value " + node.value.getClass());
+         return;
+       }
+ 
+       // if Filter.g does date parsing for quoted strings, we'd need to verify there's no
+       // type mismatch when string col is filtered by a string that looks like date.
+       if (colType == FilterType.Date && valType == FilterType.String) {
+         // Filter.g cannot parse a quoted date; try to parse date here too.
+         try {
+           nodeValue = MetaStoreUtils.PARTITION_DATE_FORMAT.get().parse((String)nodeValue);
+           valType = FilterType.Date;
+         } catch (ParseException pe) { // do nothing, handled below - types will mismatch
+         }
+       }
+ 
+       // We format it so we are sure we are getting the right value
+       if (valType == FilterType.Date) {
+         // Format
+         nodeValue = MetaStoreUtils.PARTITION_DATE_FORMAT.get().format(nodeValue);
+       }
+ 
+       if (colType != valType) {
+         // It's not clear how filtering for e.g. "stringCol > 5" should work (which side is
+         // to be coerced?). Let the expression evaluation sort this one out, not metastore.
+         filterBuffer.setError("Cannot push down filter for "
+             + colTypeStr + " column and value " + nodeValue.getClass());
+         return;
+       }
+ 
+       if (joins.isEmpty()) {
+         // There's a fixed number of partition cols that we might have filters on. To avoid
+         // joining multiple times for one column (if there are several filters on it), we will
+         // keep numCols elements in the list, one for each column; we will fill it with nulls,
+         // put each join at a corresponding index when necessary, and remove nulls in the end.
+         for (int i = 0; i < partColCount; ++i) {
+           joins.add(null);
+         }
+       }
+       if (joins.get(partColIndex) == null) {
+         joins.set(partColIndex, "inner join " + PARTITION_KEY_VALS + " \"FILTER" + partColIndex
+             + "\" on \"FILTER"  + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+             + " and \"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex);
+       }
+ 
+       // Build the filter and add parameters linearly; we are traversing leaf nodes LTR.
+       String tableValue = "\"FILTER" + partColIndex + "\".\"PART_KEY_VAL\"";
+ 
+       if (node.isReverseOrder) {
+         params.add(nodeValue);
+       }
+       String tableColumn = tableValue;
+       if (colType != FilterType.String) {
+         // The underlying database field is varchar, we need to compare numbers.
+         if (colType == FilterType.Integral) {
+           tableValue = "cast(" + tableValue + " as decimal(21,0))";
+         } else if (colType == FilterType.Date) {
+           if (dbType == DatabaseProduct.ORACLE) {
+             // Oracle requires special treatment... as usual.
+             tableValue = "TO_DATE(" + tableValue + ", 'YYYY-MM-DD')";
+           } else {
+             tableValue = "cast(" + tableValue + " as date)";
+           }
+         }
+ 
+         // Workaround for HIVE_DEFAULT_PARTITION - ignore it like JDO does, for now.
+         String tableValue0 = tableValue;
+         tableValue = "(case when " + tableColumn + " <> ?";
+         params.add(defaultPartName);
+ 
+         if (dbHasJoinCastBug) {
+           // This is a workaround for DERBY-6358 and Oracle bug; it is pretty horrible.
+           tableValue += (" and " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and "
+               + DBS + ".\"CTLG_NAME\" = ? and "
+               + "\"FILTER" + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\" and "
+                 + "\"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex);
+           params.add(table.getTableName().toLowerCase());
+           params.add(table.getDbName().toLowerCase());
+           params.add(table.getCatName().toLowerCase());
+         }
+         tableValue += " then " + tableValue0 + " else null end)";
+       }
+       if (!node.isReverseOrder) {
+         params.add(nodeValue);
+       }
+ 
+       filterBuffer.append(node.isReverseOrder
+           ? "(? " + node.operator.getSqlOp() + " " + tableValue + ")"
+           : "(" + tableValue + " " + node.operator.getSqlOp() + " ?)");
+     }
+   }
+ 
+   /**
+    * Retrieve the column statistics for the specified columns of the table. NULL
+    * is returned if the columns are not provided.
+    * @param catName     the catalog name of the table
+    * @param dbName      the database name of the table
+    * @param tableName   the table name
+    * @param colNames    the list of the column names
+    * @return            the column statistics for the specified columns
+    * @throws MetaException
+    */
+   public ColumnStatistics getTableStats(final String catName, final String dbName,
+                                         final String tableName, List<String> colNames,
+                                         boolean enableBitVector) throws MetaException {
+     if (colNames == null || colNames.isEmpty()) {
+       return null;
+     }
+     final boolean doTrace = LOG.isDebugEnabled();
+     final String queryText0 = "select " + getStatsList(enableBitVector) + " from " + TAB_COL_STATS
+           + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in (";
+     Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
+       @Override
+       public List<Object[]> run(List<String> input) throws MetaException {
+         String queryText = queryText0 + makeParams(input.size()) + ")";
+         Object[] params = new Object[input.size() + 3];
+         params[0] = catName;
+         params[1] = dbName;
+         params[2] = tableName;
+         for (int i = 0; i < input.size(); ++i) {
+           params[i + 3] = input.get(i);
+         }
+         long start = doTrace ? System.nanoTime() : 0;
+         Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+         Object qResult = executeWithArray(query, params, queryText);
+         timingTrace(doTrace, queryText0 + "...)", start, (doTrace ? System.nanoTime() : 0));
+         if (qResult == null) {
+           query.closeAll();
+           return null;
+         }
+         addQueryAfterUse(query);
+         return ensureList(qResult);
+       }
+     };
+     List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
+     if (list.isEmpty()) {
+       return null;
+     }
+     ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
+     csd.setCatName(catName);
+     ColumnStatistics result = makeColumnStats(list, csd, 0);
+     b.closeAllQueries();
+     return result;
+   }
+ 
+   public AggrStats aggrColStatsForPartitions(String catName, String dbName, String tableName,
+       List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation,
+       double ndvTuner, boolean enableBitVector) throws MetaException {
+     if (colNames.isEmpty() || partNames.isEmpty()) {
+       LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval");
+       return new AggrStats(Collections.<ColumnStatisticsObj>emptyList(), 0); // Nothing to aggregate
+     }
+     long partsFound = 0;
+     List<ColumnStatisticsObj> colStatsList;
+     // Try to read from the cache first
+     if (isAggregateStatsCacheEnabled
+         && (partNames.size() < aggrStatsCache.getMaxPartsPerCacheNode())) {
+       AggrColStats colStatsAggrCached;
+       List<ColumnStatisticsObj> colStatsAggrFromDB;
+       int maxPartsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode();
+       double fpp = aggrStatsCache.getFalsePositiveProbability();
+       colStatsList = new ArrayList<ColumnStatisticsObj>();
+       // Bloom filter for the new node that we will eventually add to the cache
+       BloomFilter bloomFilter = createPartsBloomFilter(maxPartsPerCacheNode, fpp, partNames);
+       boolean computePartsFound = true;
+       for (String colName : colNames) {
+         // Check the cache first
+         colStatsAggrCached = aggrStatsCache.get(catName, dbName, tableName, colName, partNames);
+         if (colStatsAggrCached != null) {
+           colStatsList.add(colStatsAggrCached.getColStats());
+           partsFound = colStatsAggrCached.getNumPartsCached();
+         } else {
+           if (computePartsFound) {
+             partsFound = partsFoundForPartitions(catName, dbName, tableName, partNames, colNames);
+             computePartsFound = false;
+           }
+           List<String> colNamesForDB = new ArrayList<>();
+           colNamesForDB.add(colName);
+           // Read aggregated stats for one column
+           colStatsAggrFromDB =
+               columnStatisticsObjForPartitions(catName, dbName, tableName, partNames, colNamesForDB,
+                   partsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
+           if (!colStatsAggrFromDB.isEmpty()) {
+             ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0);
+             colStatsList.add(colStatsAggr);
+             // Update the cache to add this new aggregate node
+             aggrStatsCache.add(catName, dbName, tableName, colName, partsFound, colStatsAggr, bloomFilter);
+           }
+         }
+       }
+     } else {
+       partsFound = partsFoundForPartitions(catName, dbName, tableName, partNames, colNames);
+       colStatsList =
+           columnStatisticsObjForPartitions(catName, dbName, tableName, partNames, colNames, partsFound,
+               useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
+     }
+     LOG.info("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation
+         + "\npartsFound = " + partsFound + "\nColumnStatisticsObj = "
+         + Arrays.toString(colStatsList.toArray()));
+     return new AggrStats(colStatsList, partsFound);
+   }
+ 
+   private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, double fpp,
+       List<String> partNames) {
+     BloomFilter bloomFilter = new BloomFilter(maxPartsPerCacheNode, fpp);
+     for (String partName : partNames) {
+       bloomFilter.add(partName.getBytes());
+     }
+     return bloomFilter;
+   }
+ 
+   private long partsFoundForPartitions(
+       final String catName, final String dbName, final String tableName,
+       final List<String> partNames, List<String> colNames) throws MetaException {
+     assert !colNames.isEmpty() && !partNames.isEmpty();
+     final boolean doTrace = LOG.isDebugEnabled();
+     final String queryText0  = "select count(\"COLUMN_NAME\") from " + PART_COL_STATS + ""
+         + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
+         + " and \"COLUMN_NAME\" in (%1$s) and \"PARTITION_NAME\" in (%2$s)"
+         + " group by \"PARTITION_NAME\"";
+     List<Long> allCounts = Batchable.runBatched(batchSize, colNames, new Batchable<String, Long>() {
+       @Override
+       public List<Long> run(final List<String> inputColName) throws MetaException {
+         return Batchable.runBatched(batchSize, partNames, new Batchable<String, Long>() {
+           @Override
+           public List<Long> run(List<String> inputPartNames) throws MetaException {
+             long partsFound = 0;
+             String queryText = String.format(queryText0,
+                 makeParams(inputColName.size()), makeParams(inputPartNames.size()));
+             long start = doTrace ? System.nanoTime() : 0;
+             Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+             try {
+               Object qResult = executeWithArray(query, prepareParams(
+                   catName, dbName, tableName, inputPartNames, inputColName), queryText);
+               long end = doTrace ? System.nanoTime() : 0;
+               timingTrace(doTrace, queryText, start, end);
+               ForwardQueryResult<?> fqr = (ForwardQueryResult<?>) qResult;
+               Iterator<?> iter = fqr.iterator();
+               while (iter.hasNext()) {
+                 if (extractSqlLong(iter.next()) == inputColName.size()) {
+                   partsFound++;
+                 }
+               }
+               return Lists.<Long>newArrayList(partsFound);
+             } finally {
+               query.closeAll();
+             }
+           }
+         });
+       }
+     });
+     long partsFound = 0;
+     for (Long val : allCounts) {
+       partsFound += val;
+     }
+     return partsFound;
+   }
+ 
+   private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(
+       final String catName, final String dbName,
+     final String tableName, final List<String> partNames, List<String> colNames, long partsFound,
+     final boolean useDensityFunctionForNDVEstimation, final double ndvTuner, final boolean enableBitVector) throws MetaException {
+     final boolean areAllPartsFound = (partsFound == partNames.size());
+     return Batchable.runBatched(batchSize, colNames, new Batchable<String, ColumnStatisticsObj>() {
+       @Override
+       public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException {
+         return Batchable.runBatched(batchSize, partNames, new Batchable<String, ColumnStatisticsObj>() {
+           @Override
+           public List<ColumnStatisticsObj> run(List<String> inputPartNames) throws MetaException {
+             return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName, inputPartNames,
+                 inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
+           }
+         });
+       }
+     });
+   }
+ 
+   public List<ColStatsObjWithSourceInfo> getColStatsForAllTablePartitions(String catName, String dbName,
+       boolean enableBitVector) throws MetaException {
+     String queryText = "select \"TABLE_NAME\", \"PARTITION_NAME\", " + getStatsList(enableBitVector)
+         + " from " + " " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"CAT_NAME\" = ?";
+     long start = 0;
+     long end = 0;
+     Query query = null;
+     boolean doTrace = LOG.isDebugEnabled();
+     Object qResult = null;
+     start = doTrace ? System.nanoTime() : 0;
+     List<ColStatsObjWithSourceInfo> colStatsForDB = new ArrayList<ColStatsObjWithSourceInfo>();
+     try {
+       query = pm.newQuery("javax.jdo.query.SQL", queryText);
+       qResult = executeWithArray(query, new Object[] { dbName, catName }, queryText);
+       if (qResult == null) {
+         query.closeAll();
+         return colStatsForDB;
+       }
+       end = doTrace ? System.nanoTime() : 0;
+       timingTrace(doTrace, queryText, start, end);
+       List<Object[]> list = ensureList(qResult);
+       for (Object[] row : list) {
+         String tblName = (String) row[0];
+         String partName = (String) row[1];
+         ColumnStatisticsObj colStatObj = prepareCSObj(row, 2);
+         colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, catName, dbName, tblName, partName));
+         Deadline.checkTimeout();
+       }
+     } finally {
+       query.closeAll();
+     }
+     return colStatsForDB;
+   }
+ 
+   /** Should be called with the list short enough to not trip up Oracle/etc. */
+   private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String catName, String dbName,
+       String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound,
+       boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector)
+       throws MetaException {
+     if (enableBitVector) {
+       return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, areAllPartsFound,
+           useDensityFunctionForNDVEstimation, ndvTuner);
+     } else {
+       return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, areAllPartsFound,
+           useDensityFunctionForNDVEstimation, ndvTuner);
+     }
+   }
+ 
+   private List<ColumnStatisticsObj> aggrStatsUseJava(String catName, String dbName, String tableName,
+       List<String> partNames, List<String> colNames, boolean areAllPartsFound,
+       boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
+     // 1. get all the stats for colNames in partNames;
+     List<ColumnStatistics> partStats =
+         getPartitionStats(catName, dbName, tableName, partNames, colNames, true);
+     // 2. use util function to aggr stats
+     return MetaStoreUtils.aggrPartitionStats(partStats, catName, dbName, tableName, partNames, colNames,
+         areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
+   }
+ 
+   private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String dbName,
+       String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound,
+       boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
+     // TODO: all the extrapolation logic should be moved out of this class,
+     // only mechanical data retrieval should remain here.
+     String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+         + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+         + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+         + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+         + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
+         // The following data is used to compute a partitioned table's NDV based
+         // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
+         // accurately derived from partition NDVs, because the domain of column value two partitions
+         // can overlap. If there is no overlap then global NDV is just the sum
+         // of partition NDVs (UpperBound). But if there is some overlay then
+         // global NDV can be anywhere between sum of partition NDVs (no overlap)
+         // and same as one of the partition NDV (domain of column value in all other
+         // partitions is subset of the domain value in one of the partition)
+         // (LowerBound).But under uniform distribution, we can roughly estimate the global
+         // NDV by leveraging the min/max values.
+         // And, we also guarantee that the estimation makes sense by comparing it to the
+         // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
+         // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+         + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+         + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+         + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+         + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
+         + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? ";
+     String queryText = null;
+     long start = 0;
+     long end = 0;
+     Query query = null;
+     boolean doTrace = LOG.isDebugEnabled();
+     Object qResult = null;
+     ForwardQueryResult<?> fqr = null;
+     // Check if the status of all the columns of all the partitions exists
+     // Extrapolation is not needed.
+     if (areAllPartsFound) {
+       queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
+           + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+           + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+       start = doTrace ? System.nanoTime() : 0;
+       query = pm.newQuery("javax.jdo.query.SQL", queryText);
+       qResult = executeWithArray(query, prepareParams(catName, dbName, tableName, partNames, colNames),
+           queryText);
+       if (qResult == null) {
+         query.closeAll();
+         return Collections.emptyList();
+       }
+       end = doTrace ? System.nanoTime() : 0;
+       timingTrace(doTrace, queryText, start, end);
+       List<Object[]> list = ensureList(qResult);
+       List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(list.size());
+       for (Object[] row : list) {
+         colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
+         Deadline.checkTimeout();
+       }
+       query.closeAll();
+       return colStats;
+     } else {
+       // Extrapolation is needed for some columns.
+       // In this case, at least a column status for a partition is missing.
+       // We need to extrapolate this partition based on the other partitions
+       List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
+       queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PARTITION_NAME\") "
+           + " from " + PART_COL_STATS
+           + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
+           + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
+           + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+           + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+       start = doTrace ? System.nanoTime() : 0;
+       query = pm.newQuery("javax.jdo.query.SQL", queryText);
+       qResult = executeWithArray(query, prepareParams(catName, dbName, tableName, partNames, colNames),
+           queryText);
+       end = doTrace ? System.nanoTime() : 0;
+       timingTrace(doTrace, queryText, start, end);
+       if (qResult == null) {
+         query.closeAll();
+         return Collections.emptyList();
+       }
+       List<String> noExtraColumnNames = new ArrayList<String>();
+       Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, String[]>();
+       List<Object[]> list = ensureList(qResult);
+       for (Object[] row : list) {
+         String colName = (String) row[0];
+         String colType = (String) row[1];
+         // Extrapolation is not needed for this column if
+         // count(\"PARTITION_NAME\")==partNames.size()
+         // Or, extrapolation is not possible for this column if
+         // count(\"PARTITION_NAME\")<2
+         Long count = extractSqlLong(row[2]);
+         if (count == partNames.size() || count < 2) {
+           noExtraColumnNames.add(colName);
+         } else {
+           extraColumnNameTypeParts.put(colName, new String[] { colType, String.valueOf(count) });
+         }
+         Deadline.checkTimeout();
+       }
+       query.closeAll();
+       // Extrapolation is not needed for columns noExtraColumnNames
+       if (noExtraColumnNames.size() != 0) {
+         queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
+             + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in ("
+             + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+         start = doTrace ? System.nanoTime() : 0;
+         query = pm.newQuery("javax.jdo.query.SQL", queryText);
+         qResult = executeWithArray(query,
+             prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames), queryText);
+         if (qResult == null) {
+           query.closeAll();
+           return Collections.emptyList();
+         }
+         list = ensureList(qResult);
+         for (Object[] row : list) {
+           colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
+           Deadline.checkTimeout();
+         }
+         end = doTrace ? System.nanoTime() : 0;
+         timingTrace(doTrace, queryText, start, end);
+         query.closeAll();
+       }
+       // Extrapolation is needed for extraColumnNames.
+       // give a sequence number for all the partitions
+       if (extraColumnNameTypeParts.size() != 0) {
+         Map<String, Integer> indexMap = new HashMap<String, Integer>();
+         for (int index = 0; index < partNames.size(); index++) {
+           indexMap.put(partNames.get(index), index);
+         }
+         // get sum for all columns to reduce the number of queries
+         Map<String, Map<Integer, Object>> sumMap = new HashMap<String, Map<Integer, Object>>();
+         queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
+             + " from " + PART_COL_STATS + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
+             + " and \"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size())
+             + ") and \"PARTITION_NAME\" in (" + makeParams(partNames.size())
+             + ") group by \"COLUMN_NAME\"";
+         start = doTrace ? System.nanoTime() : 0;
+         query = pm.newQuery("javax.jdo.query.SQL", queryText);
+         List<String> extraColumnNames = new ArrayList<String>();
+         extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
+         qResult = executeWithArray(query,
+             prepareParams(catName, dbName, tableName, partNames, extraColumnNames), queryText);
+         if (qResult == null) {
+           query.closeAll();
+           return Collections.emptyList();
+         }
+         list = ensureList(qResult);
+         // see the indexes for colstats in IExtrapolatePartStatus
+         Integer[] sumIndex = new Integer[] { 6, 10, 11, 15 };
+         for (Object[] row : list) {
+           Map<Integer, Object> indexToObject = new HashMap<Integer, Object>();
+           for (int ind = 1; ind < row.length; ind++) {
+             indexToObject.put(sumIndex[ind - 1], row[ind]);
+           }
+           // row[0] is the column name
+           sumMap.put((String) row[0], indexToObject);
+           Deadline.checkTimeout();
+         }
+         end = doTrace ? System.nanoTime() : 0;
+         timingTrace(doTrace, queryText, start, end);
+         query.closeAll();
+         for (Map.Entry<String, String[]> entry : extraColumnNameTypeParts.entrySet()) {
+           Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2];
+           String colName = entry.getKey();
+           String colType = entry.getValue()[0];
+           Long sumVal = Long.parseLong(entry.getValue()[1]);
+           // fill in colname
+           row[0] = colName;
+           // fill in coltype
+           row[1] = colType;
+           // use linear extrapolation. more complicated one can be added in the
+           // future.
+           IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus();
+           // fill in colstatus
+           Integer[] index = null;
+           boolean decimal = false;
+           if (colType.toLowerCase().startsWith("decimal")) {
+             index = IExtrapolatePartStatus.indexMaps.get("decimal");
+             decimal = true;
+           } else {
+             index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
+           }
+           // if the colType is not the known type, long, double, etc, then get
+           // all index.
+           if (index == null) {
+             index = IExtrapolatePartStatus.indexMaps.get("default");
+           }
+           for (int colStatIndex : index) {
+             String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex];
+             // if the aggregation type is sum, we do a scale-up
+             if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) {
+               Object o = sumMap.get(colName).get(colStatIndex);
+               if (o == null) {
+                 row[2 + colStatIndex] = null;
+               } else {
+                 Long val = extractSqlLong(o);
+                 row[2 + colStatIndex] = val / sumVal * (partNames.size());
+               }
+             } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min
+                 || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) {
+               // if the aggregation type is min/max, we extrapolate from the
+               // left/right borders
+               if (!decimal) {
+                 queryText = "select \"" + colStatName
+                     + "\",\"PARTITION_NAME\" from " + PART_COL_STATS
+                     + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?"
+                     + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+                     + " order by \"" + colStatName + "\"";
+               } else {
+                 queryText = "select \"" + colStatName
+                     + "\",\"PARTITION_NAME\" from " + PART_COL_STATS
+                     + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?"
+                     + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+                     + " order by cast(\"" + colStatName + "\" as decimal)";
+               }
+               start = doTrace ? System.nanoTime() : 0;
+               query = pm.newQuery("javax.jdo.query.SQL", queryText);
+               qResult = executeWithArray(query,
+                   prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName)), queryText);
+               if (qResult == null) {
+                 query.closeAll();
+                 return Collections.emptyList();
+               }
+               fqr = (ForwardQueryResult<?>) qResult;
+               Object[] min = (Object[]) (fqr.get(0));
+               Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
+               end = doTrace ? System.nanoTime() : 0;
+               timingTrace(doTrace, queryText, start, end);
+               query.closeAll();
+               if (min[0] == null || max[0] == null) {
+                 row[2 + colStatIndex] = null;
+               } else {
+                 row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, colStatIndex,
+                     indexMap);
+               }
+             } else {
+               // if the aggregation type is avg, we use the average on the existing ones.
+               queryText = "select "
+                   + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+                   + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+                   + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
+                   + " from " + PART_COL_STATS + "" + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?"
+                   + " and \"COLUMN_NAME\" = ?" + " and \"PARTITION_NAME\" in ("
+                   + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\"";
+               start = doTrace ? System.nanoTime() : 0;
+               query = pm.newQuery("javax.jdo.query.SQL", queryText);
+               qResult = executeWithArray(query,
+                   prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName)), queryText);
+               if (qResult == null) {
+                 query.closeAll();
+                 return Collections.emptyList();
+               }
+               fqr = (Forward

<TRUNCATED>