You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/10/07 22:47:42 UTC

[06/13] hive git commit: HIVE-20306 : Implement projection spec for fetching only requested fields from partitions (Vihang Karajgaonkar, reviewed by Aihua Xu, Andrew Sherman and Alexander Kolbasov)

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 0a25b77..af75793 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -23,8 +23,7 @@ import static org.apache.commons.lang.StringUtils.normalizeSpace;
 import static org.apache.commons.lang.StringUtils.repeat;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
 
-import java.sql.Blob;
-import java.sql.Clob;
+import java.net.URL;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -45,7 +44,7 @@ import javax.jdo.Query;
 import javax.jdo.Transaction;
 import javax.jdo.datastore.JDOConnection;
 
-import org.apache.commons.lang.BooleanUtils;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats;
@@ -135,8 +134,22 @@ class MetaStoreDirectSql {
    */
   private final boolean isCompatibleDatastore;
   private final boolean isAggregateStatsCacheEnabled;
+  private final ImmutableMap<String, String> fieldnameToTableName;
   private AggregateStatsCache aggrStatsCache;
 
+  /**
+   * This method returns a comma separated string consisting of String values of a given list.
+   * This is used for preparing "SOMETHING_ID in (...)" to use in SQL queries.
+   * @param objectIds the objectId collection
+   * @return The concatenated list
+   * @throws MetaException If the list contains wrong data
+   */
+  public static <T> String getIdListForIn(List<T> objectIds) throws MetaException {
+    return objectIds.stream()
+               .map(i -> i.toString())
+               .collect(Collectors.joining(","));
+  }
+
   @java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD)
   @java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
   private @interface TableName {}
@@ -166,11 +179,15 @@ class MetaStoreDirectSql {
       batchSize = DatabaseProduct.needsInBatching(dbType) ? 1000 : NO_BATCHING;
     }
     this.batchSize = batchSize;
+    ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder =
+        new ImmutableMap.Builder<>();
 
     for (java.lang.reflect.Field f : this.getClass().getDeclaredFields()) {
       if (f.getAnnotation(TableName.class) == null) continue;
       try {
-        f.set(this, getFullyQualifiedName(schema, f.getName()));
+        String value = getFullyQualifiedName(schema, f.getName());
+        f.set(this, value);
+        fieldNameToTableNameBuilder.put(f.getName(), value);
       } catch (IllegalArgumentException | IllegalAccessException e) {
         throw new RuntimeException("Internal error, cannot set " + f.getName());
       }
@@ -198,6 +215,27 @@ class MetaStoreDirectSql {
     if (isAggregateStatsCacheEnabled) {
       aggrStatsCache = AggregateStatsCache.getInstance(conf);
     }
+
+    // now use the tableanames to create the mapping
+    // note that some of the optional single-valued fields are not present
+    fieldnameToTableName =
+        fieldNameToTableNameBuilder
+            .put("createTime", PARTITIONS + ".\"CREATE_TIME\"")
+            .put("lastAccessTime", PARTITIONS + ".\"LAST_ACCESS_TIME\"")
+            .put("writeId", PARTITIONS + ".\"WRITE_ID\"")
+            .put("sd.location", SDS + ".\"LOCATION\"")
+            .put("sd.inputFormat", SDS + ".\"INPUT_FORMAT\"")
+            .put("sd.outputFormat", SDS + ".\"OUTPUT_FORMAT\"")
+            .put("sd.storedAsSubDirectories", SDS + ".\"IS_STOREDASSUBDIRECTORIES\"")
+            .put("sd.compressed", SDS + ".\"IS_COMPRESSED\"")
+            .put("sd.numBuckets", SDS + ".\"NUM_BUCKETS\"")
+            .put("sd.serdeInfo.name", SERDES + ".\"NAME\"")
+            .put("sd.serdeInfo.serializationLib", SERDES + ".\"SLIB\"")
+            .put("PART_ID", PARTITIONS + ".\"PART_ID\"")
+            .put("SD_ID", SDS + ".\"SD_ID\"")
+            .put("SERDE_ID", SERDES + ".\"SERDE_ID\"")
+            .put("CD_ID", SDS + ".\"CD_ID\"")
+            .build();
   }
 
   private static String getFullyQualifiedName(String schema, String tblName) {
@@ -314,7 +352,7 @@ class MetaStoreDirectSql {
       long start = doTrace ? System.nanoTime() : 0;
       statement = ((Connection)jdoConn.getNativeConnection()).createStatement();
       statement.execute(queryText);
-      timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
+      MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
     } finally {
       if(statement != null){
           statement.close();
@@ -355,7 +393,7 @@ class MetaStoreDirectSql {
       }
 
       Object[] dbline = sqlResult.get(0);
-      Long dbid = extractSqlLong(dbline[0]);
+      Long dbid = MetastoreDirectSqlUtils.extractSqlLong(dbline[0]);
 
       String queryTextDbParams = "select \"PARAM_KEY\", \"PARAM_VALUE\" "
           + " from " + DATABASE_PARAMS + " "
@@ -369,22 +407,23 @@ class MetaStoreDirectSql {
       }
 
       Map<String,String> dbParams = new HashMap<String,String>();
-      List<Object[]> sqlResult2 = ensureList(executeWithArray(
+      List<Object[]> sqlResult2 = MetastoreDirectSqlUtils.ensureList(executeWithArray(
           queryDbParams, params, queryTextDbParams));
       if (!sqlResult2.isEmpty()) {
         for (Object[] line : sqlResult2) {
-          dbParams.put(extractSqlString(line[0]), extractSqlString(line[1]));
+          dbParams.put(MetastoreDirectSqlUtils.extractSqlString(line[0]), MetastoreDirectSqlUtils
+              .extractSqlString(line[1]));
         }
       }
       Database db = new Database();
-      db.setName(extractSqlString(dbline[1]));
-      db.setLocationUri(extractSqlString(dbline[2]));
-      db.setDescription(extractSqlString(dbline[3]));
-      db.setOwnerName(extractSqlString(dbline[4]));
-      String type = extractSqlString(dbline[5]);
+      db.setName(MetastoreDirectSqlUtils.extractSqlString(dbline[1]));
+      db.setLocationUri(MetastoreDirectSqlUtils.extractSqlString(dbline[2]));
+      db.setDescription(MetastoreDirectSqlUtils.extractSqlString(dbline[3]));
+      db.setOwnerName(MetastoreDirectSqlUtils.extractSqlString(dbline[4]));
+      String type = MetastoreDirectSqlUtils.extractSqlString(dbline[5]);
       db.setOwnerType(
           (null == type || type.trim().isEmpty()) ? null : PrincipalType.valueOf(type));
-      db.setCatalogName(extractSqlString(dbline[6]));
+      db.setCatalogName(MetastoreDirectSqlUtils.extractSqlString(dbline[6]));
       db.setParameters(MetaStoreServerUtils.trimMapNulls(dbParams,convertMapNullsToEmptyStrings));
       if (LOG.isDebugEnabled()){
         LOG.debug("getDatabase: directsql returning db " + db.getName()
@@ -468,12 +507,12 @@ class MetaStoreDirectSql {
       @Override
       public List<Partition> run(List<String> input) throws MetaException {
         String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
-        List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
+        List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
             filter, input, Collections.<String>emptyList(), null);
         if (partitionIds.isEmpty()) {
           return Collections.emptyList(); // no partitions, bail early.
         }
-        return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds);
+        return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds, Collections.emptyList());
       }
     });
   }
@@ -489,17 +528,70 @@ class MetaStoreDirectSql {
     Boolean isViewTable = isViewTable(filter.table);
     String catName = filter.table.isSetCatName() ? filter.table.getCatName() :
         DEFAULT_CATALOG_NAME;
-    List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName,
+    List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName,
         filter.table.getDbName(), filter.table.getTableName(), filter.filter, filter.params,
         filter.joins, max);
     if (partitionIds.isEmpty()) {
       return Collections.emptyList(); // no partitions, bail early.
     }
-    return Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() {
+    return Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
       @Override
-      public List<Partition> run(List<Object> input) throws MetaException {
+      public List<Partition> run(List<Long> input) throws MetaException {
         return getPartitionsFromPartitionIds(catName, filter.table.getDbName(),
-            filter.table.getTableName(), isViewTable, input);
+            filter.table.getTableName(), isViewTable, input, Collections.emptyList());
+      }
+    });
+  }
+
+  /**
+   * This method can be used to return "partially-filled" partitions when clients are only interested in
+   * some fields of the Partition objects. The partitionFields parameter is a list of dot separated
+   * partition field names. For example, if a client is interested in only partition location,
+   * serializationLib, values and parameters it can specify sd.location, sd.serdeInfo.serializationLib,
+   * values, parameters in the partitionFields list. In such a case all the returned partitions will have
+   * only the requested fields set and the rest of the fields will remain unset. The implementation of this method
+   * runs queries only for the fields which are requested and pushes down the projection to the database to improve
+   * performance.
+   *
+   * @param tbl                    Table whose partitions are being requested
+   * @param partitionFields        List of dot separated field names. Each dot separated string represents nested levels. For
+   *                               instance sd.serdeInfo.serializationLib represents the serializationLib field of the StorageDescriptor
+   *                               for a the partition
+   * @param includeParamKeyPattern The SQL regex pattern which is used to include the parameter keys. Can include _ or %
+   *                               When this pattern is set, only the partition parameter key-value pairs where the key matches
+   *                               the pattern will be returned. This is applied in conjunction with excludeParamKeyPattern if it is set.
+   * @param excludeParamKeyPattern The SQL regex paterrn which is used to exclude the parameter keys. Can include _ or %
+   *                               When this pattern is set, all the partition parameters where key is NOT LIKE the pattern
+   *                               are returned. This is applied in conjunction with the includeParamKeyPattern if it is set.
+   * @return
+   * @throws MetaException
+   */
+  public List<Partition> getPartitionSpecsUsingProjection(Table tbl,
+      final List<String> partitionFields, final String includeParamKeyPattern, final String excludeParamKeyPattern)
+      throws MetaException {
+    final String tblName = tbl.getTableName();
+    final String dbName = tbl.getDbName();
+    final String catName = tbl.getCatName();
+    //TODO add support for filter
+    List<Long> partitionIds =
+        getPartitionIdsViaSqlFilter(catName, dbName, tblName, null, Collections.<String>emptyList(),
+            Collections.<String>emptyList(), null);
+    if (partitionIds.isEmpty()) {
+      return Collections.emptyList();
+    }
+    // check if table object has table type as view
+    Boolean isView = isViewTable(tbl);
+    if (isView == null) {
+      isView = isViewTable(catName, dbName, tblName);
+    }
+    PartitionProjectionEvaluator projectionEvaluator =
+        new PartitionProjectionEvaluator(pm, fieldnameToTableName, partitionFields,
+            convertMapNullsToEmptyStrings, isView, includeParamKeyPattern, excludeParamKeyPattern);
+    // Get full objects. For Oracle/etc. do it in batches.
+    return Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
+      @Override
+      public List<Partition> run(List<Long> input) throws MetaException {
+        return projectionEvaluator.getPartitionsUsingProjectionList(input);
       }
     });
   }
@@ -537,17 +629,17 @@ class MetaStoreDirectSql {
    */
   public List<Partition> getPartitions(String catName,
       String dbName, String tblName, Integer max) throws MetaException {
-    List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName,
+    List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName,
         tblName, null, Collections.<String>emptyList(), Collections.<String>emptyList(), max);
     if (partitionIds.isEmpty()) {
       return Collections.emptyList(); // no partitions, bail early.
     }
 
     // Get full objects. For Oracle/etc. do it in batches.
-    List<Partition> result = Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() {
+    List<Partition> result = Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
       @Override
-      public List<Partition> run(List<Object> input) throws MetaException {
-        return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input);
+      public List<Partition> run(List<Long> input) throws MetaException {
+        return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input, Collections.emptyList());
       }
     });
     return result;
@@ -589,7 +681,7 @@ class MetaStoreDirectSql {
    * @param max The maximum number of partitions to return.
    * @return List of partition objects.
    */
-  private List<Object> getPartitionIdsViaSqlFilter(
+  private List<Long> getPartitionIdsViaSqlFilter(
       String catName, String dbName, String tblName, String sqlFilter,
       List<? extends Object> paramsForFilter, List<String> joinsForFilter, Integer max)
       throws MetaException {
@@ -625,14 +717,14 @@ class MetaStoreDirectSql {
     }
     List<Object> sqlResult = executeWithArray(query, params, queryText);
     long queryTime = doTrace ? System.nanoTime() : 0;
-    timingTrace(doTrace, queryText, start, queryTime);
+    MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime);
     if (sqlResult.isEmpty()) {
       return Collections.emptyList(); // no partitions, bail early.
     }
 
-    List<Object> result = new ArrayList<Object>(sqlResult.size());
+    List<Long> result = new ArrayList<>(sqlResult.size());
     for (Object fields : sqlResult) {
-      result.add(extractSqlLong(fields));
+      result.add(MetastoreDirectSqlUtils.extractSqlLong(fields));
     }
     query.closeAll();
     return result;
@@ -640,26 +732,27 @@ class MetaStoreDirectSql {
 
   /** Should be called with the list short enough to not trip up Oracle/etc. */
   private List<Partition> getPartitionsFromPartitionIds(String catName, String dbName, String tblName,
-      Boolean isView, List<Object> partIdList) throws MetaException {
+      Boolean isView, List<Long> partIdList, List<String> projectionFields) throws MetaException {
+
     boolean doTrace = LOG.isDebugEnabled();
 
     int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma
     int sbCapacity = partIdList.size() * idStringWidth;
-
-    String partIds = getIdListForIn(partIdList);
-
     // Get most of the fields for the IDs provided.
     // Assume db and table names are the same for all partition, as provided in arguments.
+    String partIds = getIdListForIn(partIdList);
     String queryText =
-      "select " + PARTITIONS + ".\"PART_ID\", " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\","
-    + " " + SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\","
-    + " " + PARTITIONS + ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\","
-    + " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS + ".\"NUM_BUCKETS\","
-    + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\", " + PARTITIONS
-    + ".\"WRITE_ID\"" + " from " + PARTITIONS + ""
-    + "  left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" "
-    + "  left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" "
-    + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc";
+        "select " + PARTITIONS + ".\"PART_ID\", " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\"," + " "
+            + SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\"," + " " + PARTITIONS
+            + ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\","
+            + " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS
+            + ".\"NUM_BUCKETS\"," + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", "
+            + SERDES + ".\"SLIB\", " + PARTITIONS + ".\"WRITE_ID\"" + " from " + PARTITIONS + ""
+            + "  left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS
+            + ".\"SD_ID\" " + "  left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = "
+            + SERDES + ".\"SERDE_ID\" " + "where \"PART_ID\" in (" + partIds
+            + ") order by \"PART_NAME\" asc";
+
     long start = doTrace ? System.nanoTime() : 0;
     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
     List<Object[]> sqlResult = executeWithArray(query, null, queryText);
@@ -680,12 +773,13 @@ class MetaStoreDirectSql {
     tblName = tblName.toLowerCase();
     dbName = dbName.toLowerCase();
     catName = normalizeSpace(catName).toLowerCase();
+    partitions.navigableKeySet();
     for (Object[] fields : sqlResult) {
       // Here comes the ugly part...
-      long partitionId = extractSqlLong(fields[0]);
-      Long sdId = extractSqlLong(fields[1]);
-      Long colId = extractSqlLong(fields[2]);
-      Long serdeId = extractSqlLong(fields[3]);
+      long partitionId = MetastoreDirectSqlUtils.extractSqlLong(fields[0]);
+      Long sdId = MetastoreDirectSqlUtils.extractSqlLong(fields[1]);
+      Long colId = MetastoreDirectSqlUtils.extractSqlLong(fields[2]);
+      Long serdeId = MetastoreDirectSqlUtils.extractSqlLong(fields[3]);
       // A partition must have at least sdId and serdeId set, or nothing set if it's a view.
       if (sdId == null || serdeId == null) {
         if (isView == null) {
@@ -693,7 +787,7 @@ class MetaStoreDirectSql {
         }
         if ((sdId != null || colId != null || serdeId != null) || !isView) {
           throw new MetaException("Unexpected null for one of the IDs, SD " + sdId +
-                  ", serde " + serdeId + " for a " + (isView ? "" : "non-") + " view");
+              ", serde " + serdeId + " for a " + (isView ? "" : "non-") + " view");
         }
       }
 
@@ -705,9 +799,9 @@ class MetaStoreDirectSql {
       part.setCatName(catName);
       part.setDbName(dbName);
       part.setTableName(tblName);
-      if (fields[4] != null) part.setCreateTime(extractSqlInt(fields[4]));
-      if (fields[5] != null) part.setLastAccessTime(extractSqlInt(fields[5]));
-      Long writeId = extractSqlLong(fields[14]);
+      if (fields[4] != null) part.setCreateTime(MetastoreDirectSqlUtils.extractSqlInt(fields[4]));
+      if (fields[5] != null) part.setLastAccessTime(MetastoreDirectSqlUtils.extractSqlInt(fields[5]));
+      Long writeId = MetastoreDirectSqlUtils.extractSqlLong(fields[14]);
       if (writeId != null) {
         part.setWriteId(writeId);
       }
@@ -730,12 +824,12 @@ class MetaStoreDirectSql {
       sd.setSkewedInfo(new SkewedInfo(new ArrayList<String>(),
           new ArrayList<List<String>>(), new HashMap<List<String>, String>()));
       sd.setInputFormat((String)fields[6]);
-      Boolean tmpBoolean = extractSqlBoolean(fields[7]);
+      Boolean tmpBoolean = MetastoreDirectSqlUtils.extractSqlBoolean(fields[7]);
       if (tmpBoolean != null) sd.setCompressed(tmpBoolean);
-      tmpBoolean = extractSqlBoolean(fields[8]);
+      tmpBoolean = MetastoreDirectSqlUtils.extractSqlBoolean(fields[8]);
       if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean);
       sd.setLocation((String)fields[9]);
-      if (fields[10] != null) sd.setNumBuckets(extractSqlInt(fields[10]));
+      if (fields[10] != null) sd.setNumBuckets(MetastoreDirectSqlUtils.extractSqlInt(fields[10]));
       sd.setOutputFormat((String)fields[11]);
       sdSb.append(sdId).append(",");
       part.setSd(sd);
@@ -766,30 +860,13 @@ class MetaStoreDirectSql {
       Deadline.checkTimeout();
     }
     query.closeAll();
-    timingTrace(doTrace, queryText, start, queryTime);
+    MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime);
 
     // Now get all the one-to-many things. Start with partitions.
-    queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + PARTITION_PARAMS + ""
-        + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null"
-        + " order by \"PART_ID\" asc";
-    loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
-      @Override
-      public void apply(Partition t, Object[] fields) {
-        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
-      }});
-    // Perform conversion of null map values
-    for (Partition t : partitions.values()) {
-      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
-    }
-
-    queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from " + PARTITION_KEY_VALS + ""
-        + " where \"PART_ID\" in (" + partIds + ")"
-        + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc";
-    loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
-      @Override
-      public void apply(Partition t, Object[] fields) {
-        t.addToValues((String)fields[1]);
-      }});
+    MetastoreDirectSqlUtils
+        .setPartitionParameters(PARTITION_PARAMS, convertMapNullsToEmptyStrings, pm, partIds, partitions);
+
+    MetastoreDirectSqlUtils.setPartitionValues(PARTITION_KEY_VALS, pm, partIds, partitions);
 
     // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
     if (sdSb.length() == 0) {
@@ -802,160 +879,36 @@ class MetaStoreDirectSql {
     String colIds = trimCommaList(colsSb);
 
     // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
-    queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SD_PARAMS + ""
-        + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null"
-        + " order by \"SD_ID\" asc";
-    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-      @Override
-      public void apply(StorageDescriptor t, Object[] fields) {
-        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
-      }});
-    // Perform conversion of null map values
-    for (StorageDescriptor t : sds.values()) {
-      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
-    }
-
-    queryText = "select \"SD_ID\", \"COLUMN_NAME\", " + SORT_COLS + ".\"ORDER\""
-        + " from " + SORT_COLS + ""
-        + " where \"SD_ID\" in (" + sdIds + ")"
-        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
-    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-      @Override
-      public void apply(StorageDescriptor t, Object[] fields) {
-        if (fields[2] == null) return;
-        t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2])));
-      }});
-
-    queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from " + BUCKETING_COLS + ""
-        + " where \"SD_ID\" in (" + sdIds + ")"
-        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
-    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-      @Override
-      public void apply(StorageDescriptor t, Object[] fields) {
-        t.addToBucketCols((String)fields[1]);
-      }});
+    MetastoreDirectSqlUtils.setSDParameters(SD_PARAMS, convertMapNullsToEmptyStrings, pm, sds, sdIds);
+
+    MetastoreDirectSqlUtils.setSDSortCols(SORT_COLS, pm, sds, sdIds);
+
+    MetastoreDirectSqlUtils.setSDBucketCols(BUCKETING_COLS, pm, sds, sdIds);
 
     // Skewed columns stuff.
-    queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from " + SKEWED_COL_NAMES + ""
-        + " where \"SD_ID\" in (" + sdIds + ")"
-        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
-    boolean hasSkewedColumns =
-      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-        @Override
-        public void apply(StorageDescriptor t, Object[] fields) {
-          if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
-          t.getSkewedInfo().addToSkewedColNames((String)fields[1]);
-        }}) > 0;
+    boolean hasSkewedColumns = MetastoreDirectSqlUtils
+        .setSkewedColNames(SKEWED_COL_NAMES, pm, sds, sdIds);
 
     // Assume we don't need to fetch the rest of the skewed column data if we have no columns.
     if (hasSkewedColumns) {
       // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
-      queryText =
-            "select " + SKEWED_VALUES + ".\"SD_ID_OID\","
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\","
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
-          + "from " + SKEWED_VALUES + " "
-          + "  left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_VALUES + "."
-          + "\"STRING_LIST_ID_EID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
-          + "where " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ") "
-          + "  and " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" is not null "
-          + "  and " + SKEWED_VALUES + ".\"INTEGER_IDX\" >= 0 "
-          + "order by " + SKEWED_VALUES + ".\"SD_ID_OID\" asc, " + SKEWED_VALUES + ".\"INTEGER_IDX\" asc,"
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
-      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-        private Long currentListId;
-        private List<String> currentList;
-        @Override
-        public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
-          if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
-          // Note that this is not a typical list accumulator - there's no call to finalize
-          // the last list. Instead we add list to SD first, as well as locally to add elements.
-          if (fields[1] == null) {
-            currentList = null; // left outer join produced a list with no values
-            currentListId = null;
-            t.getSkewedInfo().addToSkewedColValues(Collections.<String>emptyList());
-          } else {
-            long fieldsListId = extractSqlLong(fields[1]);
-            if (currentListId == null || fieldsListId != currentListId) {
-              currentList = new ArrayList<String>();
-              currentListId = fieldsListId;
-              t.getSkewedInfo().addToSkewedColValues(currentList);
-            }
-            currentList.add((String)fields[2]);
-          }
-        }});
+      MetastoreDirectSqlUtils
+          .setSkewedColValues(SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, pm, sds, sdIds);
 
       // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
-      queryText =
-            "select " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\","
-          + " " + SKEWED_STRING_LIST_VALUES + ".STRING_LIST_ID,"
-          + " " + SKEWED_COL_VALUE_LOC_MAP + ".\"LOCATION\","
-          + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
-          + "from " + SKEWED_COL_VALUE_LOC_MAP + ""
-          + "  left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_COL_VALUE_LOC_MAP + "."
-          + "\"STRING_LIST_ID_KID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
-          + "where " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" in (" + sdIds + ")"
-          + "  and " + SKEWED_COL_VALUE_LOC_MAP + ".\"STRING_LIST_ID_KID\" is not null "
-          + "order by " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" asc,"
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" asc,"
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
-
-      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-        private Long currentListId;
-        private List<String> currentList;
-        @Override
-        public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
-          if (!t.isSetSkewedInfo()) {
-            SkewedInfo skewedInfo = new SkewedInfo();
-            skewedInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
-            t.setSkewedInfo(skewedInfo);
-          }
-          Map<List<String>, String> skewMap = t.getSkewedInfo().getSkewedColValueLocationMaps();
-          // Note that this is not a typical list accumulator - there's no call to finalize
-          // the last list. Instead we add list to SD first, as well as locally to add elements.
-          if (fields[1] == null) {
-            currentList = new ArrayList<String>(); // left outer join produced a list with no values
-            currentListId = null;
-          } else {
-            long fieldsListId = extractSqlLong(fields[1]);
-            if (currentListId == null || fieldsListId != currentListId) {
-              currentList = new ArrayList<String>();
-              currentListId = fieldsListId;
-            } else {
-              skewMap.remove(currentList); // value based compare.. remove first
-            }
-            currentList.add((String)fields[3]);
-          }
-          skewMap.put(currentList, (String)fields[2]);
-        }});
+      MetastoreDirectSqlUtils
+          .setSkewedColLocationMaps(SKEWED_COL_VALUE_LOC_MAP, SKEWED_STRING_LIST_VALUES, pm, sds, sdIds);
     } // if (hasSkewedColumns)
 
     // Get FieldSchema stuff if any.
     if (!colss.isEmpty()) {
       // We are skipping the CDS table here, as it seems to be totally useless.
-      queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\""
-          + " from " + COLUMNS_V2 + " where \"CD_ID\" in (" + colIds + ")"
-          + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc";
-      loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
-        @Override
-        public void apply(List<FieldSchema> t, Object[] fields) {
-          t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1]));
-        }});
+      MetastoreDirectSqlUtils.setSDCols(COLUMNS_V2, pm, colss, colIds);
     }
 
     // Finally, get all the stuff for serdes - just the params.
-    queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SERDE_PARAMS + ""
-        + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null"
-        + " order by \"SERDE_ID\" asc";
-    loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
-      @Override
-      public void apply(SerDeInfo t, Object[] fields) {
-        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
-      }});
-    // Perform conversion of null map values
-    for (SerDeInfo t : serdes.values()) {
-      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
-    }
+    MetastoreDirectSqlUtils
+        .setSerdeParams(SERDE_PARAMS, convertMapNullsToEmptyStrings, pm, serdes, serdeIds);
 
     return orderedResult;
   }
@@ -987,124 +940,12 @@ class MetaStoreDirectSql {
     long start = doTrace ? System.nanoTime() : 0;
     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
     query.setUnique(true);
-    int sqlResult = extractSqlInt(query.executeWithArray(params));
+    int sqlResult = MetastoreDirectSqlUtils.extractSqlInt(query.executeWithArray(params));
     long queryTime = doTrace ? System.nanoTime() : 0;
-    timingTrace(doTrace, queryText, start, queryTime);
+    MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime);
     return sqlResult;
   }
 
-
-  private void timingTrace(boolean doTrace, String queryText, long start, long queryTime) {
-    if (!doTrace) return;
-    LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
-        (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
-  }
-
-  static Long extractSqlLong(Object obj) throws MetaException {
-    if (obj == null) return null;
-    if (!(obj instanceof Number)) {
-      throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
-    }
-    return ((Number)obj).longValue();
-  }
-
-  /**
-   * Convert a boolean value returned from the RDBMS to a Java Boolean object.
-   * MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping.
-   *
-   * @param value
-   *          column value from the database
-   * @return The Boolean value of the database column value, null if the column
-   *         value is null
-   * @throws MetaException
-   *           if the column value cannot be converted into a Boolean object
-   */
-  private static Boolean extractSqlBoolean(Object value) throws MetaException {
-    if (value == null) {
-      return null;
-    }
-    if (value instanceof Boolean) {
-      return (Boolean)value;
-    }
-    if (value instanceof String) {
-      try {
-        return BooleanUtils.toBooleanObject((String) value, "Y", "N", null);
-      } catch (IllegalArgumentException iae) {
-        // NOOP
-      }
-    }
-    throw new MetaException("Cannot extract boolean from column value " + value);
-  }
-
-  private int extractSqlInt(Object field) {
-    return ((Number)field).intValue();
-  }
-
-  private String extractSqlString(Object value) {
-    if (value == null) return null;
-    return value.toString();
-  }
-
-  static Double extractSqlDouble(Object obj) throws MetaException {
-    if (obj == null)
-      return null;
-    if (!(obj instanceof Number)) {
-      throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
-    }
-    return ((Number) obj).doubleValue();
-  }
-
-  private String extractSqlClob(Object value) {
-    if (value == null) return null;
-    try {
-      if (value instanceof Clob) {
-        // we trim the Clob value to a max length an int can hold
-        int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2;
-        return ((Clob)value).getSubString(1L, maxLength);
-      } else {
-        return value.toString();
-      }
-    } catch (SQLException sqle) {
-      return null;
-    }
-  }
-
-  static byte[] extractSqlBlob(Object value) throws MetaException {
-    if (value == null)
-      return null;
-    if (value instanceof Blob) {
-      //derby, oracle
-      try {
-        // getBytes function says: pos the ordinal position of the first byte in
-        // the BLOB value to be extracted; the first byte is at position 1
-        return ((Blob) value).getBytes(1, (int) ((Blob) value).length());
-      } catch (SQLException e) {
-        throw new MetaException("Encounter error while processing blob.");
-      }
-    }
-    else if (value instanceof byte[]) {
-      // mysql, postgres, sql server
-      return (byte[]) value;
-    }
-	else {
-      // this may happen when enablebitvector is false
-      LOG.debug("Expected blob type but got " + value.getClass().getName());
-      return null;
-    }
-  }
-
-  /**
-   * Helper method for preparing for "SOMETHING_ID in (...)" to use in future queries.
-   * @param objectIds the objectId collection
-   * @return The concatenated list
-   * @throws MetaException If the list contains wrong data
-   */
-  private static String getIdListForIn(List<Object> objectIds) throws MetaException {
-    return objectIds.stream()
-               .map(i -> i.toString())
-               .collect(Collectors.joining(","));
-  }
-
   private static String trimCommaList(StringBuilder sb) {
     if (sb.length() > 0) {
       sb.setLength(sb.length() - 1);
@@ -1112,55 +953,6 @@ class MetaStoreDirectSql {
     return sb.toString();
   }
 
-  private abstract class ApplyFunc<Target> {
-    public abstract void apply(Target t, Object[] fields) throws MetaException;
-  }
-
-  /**
-   * Merges applies the result of a PM SQL query into a tree of object.
-   * Essentially it's an object join. DN could do this for us, but it issues queries
-   * separately for every object, which is suboptimal.
-   * @param tree The object tree, by ID.
-   * @param queryText The query text.
-   * @param keyIndex Index of the Long column corresponding to the map ID in query result rows.
-   * @param func The function that is called on each (object,row) pair with the same id.
-   * @return the count of results returned from the query.
-   */
-  private <T> int loopJoinOrderedResult(TreeMap<Long, T> tree,
-      String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException {
-    boolean doTrace = LOG.isDebugEnabled();
-    long start = doTrace ? System.nanoTime() : 0;
-    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    Object result = query.execute();
-    long queryTime = doTrace ? System.nanoTime() : 0;
-    if (result == null) {
-      query.closeAll();
-      return 0;
-    }
-    List<Object[]> list = ensureList(result);
-    Iterator<Object[]> iter = list.iterator();
-    Object[] fields = null;
-    for (Map.Entry<Long, T> entry : tree.entrySet()) {
-      if (fields == null && !iter.hasNext()) break;
-      long id = entry.getKey();
-      while (fields != null || iter.hasNext()) {
-        if (fields == null) {
-          fields = iter.next();
-        }
-        long nestedId = extractSqlLong(fields[keyIndex]);
-        if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId);
-        if (nestedId > id) break; // fields belong to one of the next entries
-        func.apply(entry.getValue(), fields);
-        fields = null;
-      }
-      Deadline.checkTimeout();
-    }
-    int rv = list.size();
-    query.closeAll();
-    timingTrace(doTrace, queryText, start, queryTime);
-    return rv;
-  }
-
   private static class PartitionFilterGenerator extends TreeVisitor {
     private final Table table;
     private final FilterBuilder filterBuffer;
@@ -1416,13 +1208,13 @@ class MetaStoreDirectSql {
         long start = doTrace ? System.nanoTime() : 0;
         Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
         Object qResult = executeWithArray(query, params, queryText);
-        timingTrace(doTrace, queryText0 + "...)", start, (doTrace ? System.nanoTime() : 0));
+        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0 + "...)", start, (doTrace ? System.nanoTime() : 0));
         if (qResult == null) {
           query.closeAll();
           return null;
         }
         addQueryAfterUse(query);
-        return ensureList(qResult);
+        return MetastoreDirectSqlUtils.ensureList(qResult);
       }
     };
     List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
@@ -1526,11 +1318,11 @@ class MetaStoreDirectSql {
               Object qResult = executeWithArray(query, prepareParams(
                   catName, dbName, tableName, inputPartNames, inputColName), queryText);
               long end = doTrace ? System.nanoTime() : 0;
-              timingTrace(doTrace, queryText, start, end);
+              MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
               ForwardQueryResult<?> fqr = (ForwardQueryResult<?>) qResult;
               Iterator<?> iter = fqr.iterator();
               while (iter.hasNext()) {
-                if (extractSqlLong(iter.next()) == inputColName.size()) {
+                if (MetastoreDirectSqlUtils.extractSqlLong(iter.next()) == inputColName.size()) {
                   partsFound++;
                 }
               }
@@ -1587,8 +1379,8 @@ class MetaStoreDirectSql {
         return colStatsForDB;
       }
       end = doTrace ? System.nanoTime() : 0;
-      timingTrace(doTrace, queryText, start, end);
-      List<Object[]> list = ensureList(qResult);
+      MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
+      List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
       for (Object[] row : list) {
         String tblName = (String) row[0];
         String partName = (String) row[1];
@@ -1677,8 +1469,8 @@ class MetaStoreDirectSql {
         return Collections.emptyList();
       }
       end = doTrace ? System.nanoTime() : 0;
-      timingTrace(doTrace, queryText, start, end);
-      List<Object[]> list = ensureList(qResult);
+      MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
+      List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
       List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(list.size());
       for (Object[] row : list) {
         colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
@@ -1702,14 +1494,14 @@ class MetaStoreDirectSql {
       qResult = executeWithArray(query, prepareParams(catName, dbName, tableName, partNames, colNames),
           queryText);
       end = doTrace ? System.nanoTime() : 0;
-      timingTrace(doTrace, queryText, start, end);
+      MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
       if (qResult == null) {
         query.closeAll();
         return Collections.emptyList();
       }
       List<String> noExtraColumnNames = new ArrayList<String>();
       Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, String[]>();
-      List<Object[]> list = ensureList(qResult);
+      List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
       for (Object[] row : list) {
         String colName = (String) row[0];
         String colType = (String) row[1];
@@ -1717,7 +1509,7 @@ class MetaStoreDirectSql {
         // count(\"PARTITION_NAME\")==partNames.size()
         // Or, extrapolation is not possible for this column if
         // count(\"PARTITION_NAME\")<2
-        Long count = extractSqlLong(row[2]);
+        Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
         if (count == partNames.size() || count < 2) {
           noExtraColumnNames.add(colName);
         } else {
@@ -1739,13 +1531,13 @@ class MetaStoreDirectSql {
           query.closeAll();
           return Collections.emptyList();
         }
-        list = ensureList(qResult);
+        list = MetastoreDirectSqlUtils.ensureList(qResult);
         for (Object[] row : list) {
           colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
           Deadline.checkTimeout();
         }
         end = doTrace ? System.nanoTime() : 0;
-        timingTrace(doTrace, queryText, start, end);
+        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
         query.closeAll();
       }
       // Extrapolation is needed for extraColumnNames.
@@ -1772,7 +1564,7 @@ class MetaStoreDirectSql {
           query.closeAll();
           return Collections.emptyList();
         }
-        list = ensureList(qResult);
+        list = MetastoreDirectSqlUtils.ensureList(qResult);
         // see the indexes for colstats in IExtrapolatePartStatus
         Integer[] sumIndex = new Integer[] { 6, 10, 11, 15 };
         for (Object[] row : list) {
@@ -1785,7 +1577,7 @@ class MetaStoreDirectSql {
           Deadline.checkTimeout();
         }
         end = doTrace ? System.nanoTime() : 0;
-        timingTrace(doTrace, queryText, start, end);
+        MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
         query.closeAll();
         for (Map.Entry<String, String[]> entry : extraColumnNameTypeParts.entrySet()) {
           Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2];
@@ -1821,7 +1613,7 @@ class MetaStoreDirectSql {
               if (o == null) {
                 row[2 + colStatIndex] = null;
               } else {
-                Long val = extractSqlLong(o);
+                Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
                 row[2 + colStatIndex] = val / sumVal * (partNames.size());
               }
             } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min
@@ -1853,7 +1645,7 @@ class MetaStoreDirectSql {
               Object[] min = (Object[]) (fqr.get(0));
               Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
               end = doTrace ? System.nanoTime() : 0;
-              timingTrace(doTrace, queryText, start, end);
+              MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
               query.closeAll();
               if (min[0] == null || max[0] == null) {
                 row[2 + colStatIndex] = null;
@@ -1884,7 +1676,7 @@ class MetaStoreDirectSql {
               // "AVG_DECIMAL"
               row[2 + colStatIndex] = avg[colStatIndex - 12];
               end = doTrace ? System.nanoTime() : 0;
-              timingTrace(doTrace, queryText, start, end);
+              MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
               query.closeAll();
             }
           }
@@ -1959,13 +1751,13 @@ class MetaStoreDirectSql {
             Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
             Object qResult = executeWithArray(query, prepareParams(
                 catName, dbName, tableName, inputPartNames, inputColNames), queryText);
-            timingTrace(doTrace, queryText0, start, (doTrace ? System.nanoTime() : 0));
+            MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start, (doTrace ? System.nanoTime() : 0));
             if (qResult == null) {
               query.closeAll();
               return Collections.emptyList();
             }
             addQueryAfterUse(query);
-            return ensureList(qResult);
+            return MetastoreDirectSqlUtils.ensureList(qResult);
           }
         };
         try {
@@ -2018,8 +1810,9 @@ class MetaStoreDirectSql {
       // LastAnalyzed is stored per column but thrift has it per several;
       // get the lowest for now as nobody actually uses this field.
       Object laObj = row[offset + 15];
-      if (laObj != null && (!csd.isSetLastAnalyzed() || csd.getLastAnalyzed() > extractSqlLong(laObj))) {
-        csd.setLastAnalyzed(extractSqlLong(laObj));
+      if (laObj != null && (!csd.isSetLastAnalyzed() || csd.getLastAnalyzed() > MetastoreDirectSqlUtils
+          .extractSqlLong(laObj))) {
+        csd.setLastAnalyzed(MetastoreDirectSqlUtils.extractSqlLong(laObj));
       }
       csos.add(prepareCSObj(row, offset));
       Deadline.checkTimeout();
@@ -2028,14 +1821,6 @@ class MetaStoreDirectSql {
     return result;
   }
 
-  @SuppressWarnings("unchecked")
-  private List<Object[]> ensureList(Object result) throws MetaException {
-    if (!(result instanceof List<?>)) {
-      throw new MetaException("Wrong result type " + result.getClass());
-    }
-    return (List<Object[]>)result;
-  }
-
   private String makeParams(int size) {
     // W/ size 0, query will fail, but at least we'd get to see the query in debug output.
     return (size == 0) ? "" : repeat(",?", size).substring(1);
@@ -2043,21 +1828,7 @@ class MetaStoreDirectSql {
 
   @SuppressWarnings("unchecked")
   private <T> T executeWithArray(Query query, Object[] params, String sql) throws MetaException {
-    try {
-      return (T)((params == null) ? query.execute() : query.executeWithArray(params));
-    } catch (Exception ex) {
-      String error = "Failed to execute [" + sql + "] with parameters [";
-      if (params != null) {
-        boolean isFirst = true;
-        for (Object param : params) {
-          error += (isFirst ? "" : ", ") + param;
-          isFirst = false;
-        }
-      }
-      LOG.warn(error + "]", ex);
-      // We just logged an exception with (in case of JDO) a humongous callstack. Make a new one.
-      throw new MetaException("See previous errors; " + ex.getMessage());
-    }
+    return MetastoreDirectSqlUtils.executeWithArray(query, params, sql);
   }
 
   /**
@@ -2135,27 +1906,27 @@ class MetaStoreDirectSql {
     }
 
     Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
-      List<Object[]> sqlResult = ensureList(executeWithArray(
+      List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
         queryParams, pms.toArray(), queryText));
 
     if (!sqlResult.isEmpty()) {
       for (Object[] line : sqlResult) {
-        int enableValidateRely = extractSqlInt(line[11]);
+        int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[11]);
         boolean enable = (enableValidateRely & 4) != 0;
         boolean validate = (enableValidateRely & 2) != 0;
         boolean rely = (enableValidateRely & 1) != 0;
         SQLForeignKey currKey = new SQLForeignKey(
-          extractSqlString(line[0]),
-          extractSqlString(line[1]),
-          extractSqlString(line[2]),
-          extractSqlString(line[3]),
-          extractSqlString(line[4]),
-          extractSqlString(line[5]),
-          extractSqlInt(line[6]),
-          extractSqlInt(line[7]),
-          extractSqlInt(line[8]),
-          extractSqlString(line[9]),
-          extractSqlString(line[10]),
+          MetastoreDirectSqlUtils.extractSqlString(line[0]),
+          MetastoreDirectSqlUtils.extractSqlString(line[1]),
+          MetastoreDirectSqlUtils.extractSqlString(line[2]),
+          MetastoreDirectSqlUtils.extractSqlString(line[3]),
+          MetastoreDirectSqlUtils.extractSqlString(line[4]),
+          MetastoreDirectSqlUtils.extractSqlString(line[5]),
+          MetastoreDirectSqlUtils.extractSqlInt(line[6]),
+          MetastoreDirectSqlUtils.extractSqlInt(line[7]),
+          MetastoreDirectSqlUtils.extractSqlInt(line[8]),
+          MetastoreDirectSqlUtils.extractSqlString(line[9]),
+          MetastoreDirectSqlUtils.extractSqlString(line[10]),
           enable,
           validate,
           rely
@@ -2202,24 +1973,24 @@ class MetaStoreDirectSql {
     }
 
     Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
-      List<Object[]> sqlResult = ensureList(executeWithArray(
+      List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
         queryParams, pms.toArray(), queryText));
 
     if (!sqlResult.isEmpty()) {
       for (Object[] line : sqlResult) {
-          int enableValidateRely = extractSqlInt(line[5]);
+          int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[5]);
           boolean enable = (enableValidateRely & 4) != 0;
           boolean validate = (enableValidateRely & 2) != 0;
           boolean rely = (enableValidateRely & 1) != 0;
         SQLPrimaryKey currKey = new SQLPrimaryKey(
-          extractSqlString(line[0]),
-          extractSqlString(line[1]),
-          extractSqlString(line[2]),
-          extractSqlInt(line[3]), extractSqlString(line[4]),
+          MetastoreDirectSqlUtils.extractSqlString(line[0]),
+          MetastoreDirectSqlUtils.extractSqlString(line[1]),
+          MetastoreDirectSqlUtils.extractSqlString(line[2]),
+          MetastoreDirectSqlUtils.extractSqlInt(line[3]), MetastoreDirectSqlUtils.extractSqlString(line[4]),
           enable,
           validate,
           rely);
-        currKey.setCatName(extractSqlString(line[6]));
+        currKey.setCatName(MetastoreDirectSqlUtils.extractSqlString(line[6]));
         ret.add(currKey);
       }
     }
@@ -2260,21 +2031,21 @@ class MetaStoreDirectSql {
     }
 
     Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
-      List<Object[]> sqlResult = ensureList(executeWithArray(
+      List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
         queryParams, pms.toArray(), queryText));
 
     if (!sqlResult.isEmpty()) {
       for (Object[] line : sqlResult) {
-          int enableValidateRely = extractSqlInt(line[5]);
+          int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[5]);
           boolean enable = (enableValidateRely & 4) != 0;
           boolean validate = (enableValidateRely & 2) != 0;
           boolean rely = (enableValidateRely & 1) != 0;
         ret.add(new SQLUniqueConstraint(
             catName,
-            extractSqlString(line[0]),
-            extractSqlString(line[1]),
-            extractSqlString(line[2]),
-            extractSqlInt(line[3]), extractSqlString(line[4]),
+            MetastoreDirectSqlUtils.extractSqlString(line[0]),
+            MetastoreDirectSqlUtils.extractSqlString(line[1]),
+            MetastoreDirectSqlUtils.extractSqlString(line[2]),
+            MetastoreDirectSqlUtils.extractSqlInt(line[3]), MetastoreDirectSqlUtils.extractSqlString(line[4]),
             enable,
             validate,
             rely));
@@ -2317,21 +2088,21 @@ class MetaStoreDirectSql {
     }
 
     Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
-      List<Object[]> sqlResult = ensureList(executeWithArray(
+      List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
         queryParams, pms.toArray(), queryText));
 
     if (!sqlResult.isEmpty()) {
       for (Object[] line : sqlResult) {
-          int enableValidateRely = extractSqlInt(line[4]);
+          int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[4]);
           boolean enable = (enableValidateRely & 4) != 0;
           boolean validate = (enableValidateRely & 2) != 0;
           boolean rely = (enableValidateRely & 1) != 0;
         ret.add(new SQLNotNullConstraint(
             catName,
-            extractSqlString(line[0]),
-            extractSqlString(line[1]),
-            extractSqlString(line[2]),
-            extractSqlString(line[3]),
+            MetastoreDirectSqlUtils.extractSqlString(line[0]),
+            MetastoreDirectSqlUtils.extractSqlString(line[1]),
+            MetastoreDirectSqlUtils.extractSqlString(line[2]),
+            MetastoreDirectSqlUtils.extractSqlString(line[3]),
             enable,
             validate,
             rely));
@@ -2378,22 +2149,22 @@ class MetaStoreDirectSql {
     }
 
     Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
-    List<Object[]> sqlResult = ensureList(executeWithArray(
+    List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
         queryParams, pms.toArray(), queryText));
 
     if (!sqlResult.isEmpty()) {
       for (Object[] line : sqlResult) {
-        int enableValidateRely = extractSqlInt(line[4]);
+        int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[4]);
         boolean enable = (enableValidateRely & 4) != 0;
         boolean validate = (enableValidateRely & 2) != 0;
         boolean rely = (enableValidateRely & 1) != 0;
         SQLDefaultConstraint currConstraint = new SQLDefaultConstraint(
             catName,
-            extractSqlString(line[0]),
-            extractSqlString(line[1]),
-            extractSqlString(line[2]),
-            extractSqlString(line[5]),
-            extractSqlString(line[3]),
+            MetastoreDirectSqlUtils.extractSqlString(line[0]),
+            MetastoreDirectSqlUtils.extractSqlString(line[1]),
+            MetastoreDirectSqlUtils.extractSqlString(line[2]),
+            MetastoreDirectSqlUtils.extractSqlString(line[5]),
+            MetastoreDirectSqlUtils.extractSqlString(line[3]),
             enable,
             validate,
             rely);
@@ -2441,22 +2212,22 @@ class MetaStoreDirectSql {
     }
 
     Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
-    List<Object[]> sqlResult = ensureList(executeWithArray(
+    List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
         queryParams, pms.toArray(), queryText));
 
     if (!sqlResult.isEmpty()) {
       for (Object[] line : sqlResult) {
-        int enableValidateRely = extractSqlInt(line[4]);
+        int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[4]);
         boolean enable = (enableValidateRely & 4) != 0;
         boolean validate = (enableValidateRely & 2) != 0;
         boolean rely = (enableValidateRely & 1) != 0;
         SQLCheckConstraint currConstraint = new SQLCheckConstraint(
             catName,
-            extractSqlString(line[0]),
-            extractSqlString(line[1]),
-            extractSqlString(line[2]),
-            extractSqlString(line[5]),
-            extractSqlString(line[3]),
+            MetastoreDirectSqlUtils.extractSqlString(line[0]),
+            MetastoreDirectSqlUtils.extractSqlString(line[1]),
+            MetastoreDirectSqlUtils.extractSqlString(line[2]),
+            MetastoreDirectSqlUtils.extractSqlString(line[5]),
+            MetastoreDirectSqlUtils.extractSqlString(line[3]),
             enable,
             validate,
             rely);
@@ -2486,7 +2257,7 @@ class MetaStoreDirectSql {
       public List<Void> run(List<String> input) throws MetaException {
         String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
         // Get partition ids
-        List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
+        List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
             filter, input, Collections.<String>emptyList(), null);
         if (partitionIds.isEmpty()) {
           return Collections.emptyList(); // no partitions, bail early.
@@ -2504,11 +2275,12 @@ class MetaStoreDirectSql {
    * @throws MetaException If there is an SQL exception during the execution it converted to
    * MetaException
    */
-  private void dropPartitionsByPartitionIds(List<Object> partitionIdList) throws MetaException {
+  private void dropPartitionsByPartitionIds(List<Long> partitionIdList) throws MetaException {
     String queryText;
     if (partitionIdList.isEmpty()) {
       return;
     }
+
     String partitionIds = getIdListForIn(partitionIdList);
 
     // Get the corresponding SD_ID-s, CD_ID-s, SERDE_ID-s
@@ -2519,7 +2291,8 @@ class MetaStoreDirectSql {
             + "WHERE " + PARTITIONS + ".\"PART_ID\" in (" + partitionIds + ")";
 
     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    List<Object[]> sqlResult = ensureList(executeWithArray(query, null, queryText));
+    List<Object[]> sqlResult = MetastoreDirectSqlUtils
+        .ensureList(executeWithArray(query, null, queryText));
 
     List<Object> sdIdList = new ArrayList<>(partitionIdList.size());
     List<Object> columnDescriptorIdList = new ArrayList<>(1);
@@ -2527,12 +2300,12 @@ class MetaStoreDirectSql {
 
     if (!sqlResult.isEmpty()) {
       for (Object[] fields : sqlResult) {
-        sdIdList.add(extractSqlLong(fields[0]));
-        Long colId = extractSqlLong(fields[1]);
+        sdIdList.add(MetastoreDirectSqlUtils.extractSqlLong(fields[0]));
+        Long colId = MetastoreDirectSqlUtils.extractSqlLong(fields[1]);
         if (!columnDescriptorIdList.contains(colId)) {
           columnDescriptorIdList.add(colId);
         }
-        serdeIdList.add(extractSqlLong(fields[2]));
+        serdeIdList.add(MetastoreDirectSqlUtils.extractSqlLong(fields[2]));
       }
     }
     query.closeAll();
@@ -2602,13 +2375,14 @@ class MetaStoreDirectSql {
             + "WHERE " + SKEWED_VALUES + ".\"SD_ID_OID\" in  (" + sdIds + ")";
 
     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    List<Object[]> sqlResult = ensureList(executeWithArray(query, null, queryText));
+    List<Object[]> sqlResult = MetastoreDirectSqlUtils
+        .ensureList(executeWithArray(query, null, queryText));
 
     List<Object> skewedStringListIdList = new ArrayList<>(0);
 
     if (!sqlResult.isEmpty()) {
       for (Object[] fields : sqlResult) {
-        skewedStringListIdList.add(extractSqlLong(fields[0]));
+        skewedStringListIdList.add(MetastoreDirectSqlUtils.extractSqlLong(fields[0]));
       }
     }
     query.closeAll();
@@ -2721,13 +2495,14 @@ class MetaStoreDirectSql {
             + "WHERE " + SDS + ".\"CD_ID\" in (" + colIds + ") "
             + "GROUP BY " + SDS + ".\"CD_ID\"";
     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    List<Object[]> sqlResult = ensureList(executeWithArray(query, null, queryText));
+    List<Object[]> sqlResult = MetastoreDirectSqlUtils
+        .ensureList(executeWithArray(query, null, queryText));
 
     List<Object> danglingColumnDescriptorIdList = new ArrayList<>(columnDescriptorIdList.size());
     if (!sqlResult.isEmpty()) {
       for (Object[] fields : sqlResult) {
-        if (extractSqlInt(fields[1]) == 0) {
-          danglingColumnDescriptorIdList.add(extractSqlLong(fields[0]));
+        if (MetastoreDirectSqlUtils.extractSqlInt(fields[1]) == 0) {
+          danglingColumnDescriptorIdList.add(MetastoreDirectSqlUtils.extractSqlLong(fields[0]));
         }
       }
     }
@@ -2792,14 +2567,14 @@ class MetaStoreDirectSql {
     LOG.debug("Running {}", queryText);
     Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
     try {
-      List<Object[]> sqlResult = ensureList(executeWithArray(
+      List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
           query, new Object[] { dbName, catName, tableName }, queryText));
       Map<String, List<String>> result = new HashMap<>();
       String lastPartName = null;
       List<String> cols = null;
       for (Object[] line : sqlResult) {
-        String col = extractSqlString(line[1]);
-        String part = extractSqlString(line[0]);
+        String col = MetastoreDirectSqlUtils.extractSqlString(line[1]);
+        String part = MetastoreDirectSqlUtils.extractSqlString(line[0]);
         if (!part.equals(lastPartName)) {
           if (lastPartName != null) {
             result.put(lastPartName, cols);
@@ -2833,10 +2608,12 @@ class MetaStoreDirectSql {
     LOG.debug("Running {}", queryText);
     Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
     try {
-      List<Object[]> sqlResult = ensureList(executeWithArray(query, STATS_TABLE_TYPES, queryText));
+      List<Object[]> sqlResult = MetastoreDirectSqlUtils
+          .ensureList(executeWithArray(query, STATS_TABLE_TYPES, queryText));
       for (Object[] line : sqlResult) {
         result.add(new org.apache.hadoop.hive.common.TableName(
-            extractSqlString(line[2]), extractSqlString(line[1]), extractSqlString(line[0])));
+            MetastoreDirectSqlUtils.extractSqlString(line[2]), MetastoreDirectSqlUtils
+            .extractSqlString(line[1]), MetastoreDirectSqlUtils.extractSqlString(line[0])));
       }
     } finally {
       query.closeAll();