You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/10/07 22:47:40 UTC

[04/13] hive git commit: HIVE-20306 : Implement projection spec for fetching only requested fields from partitions (Vihang Karajgaonkar, reviewed by Aihua Xu, Andrew Sherman and Alexander Kolbasov)

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
new file mode 100644
index 0000000..3905b9e
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
@@ -0,0 +1,571 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Helper utilities used by DirectSQL code in HiveMetastore.
+ */
+class MetastoreDirectSqlUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(MetastoreDirectSqlUtils.class);
+  private MetastoreDirectSqlUtils() {
+
+  }
+  @SuppressWarnings("unchecked")
+  static <T> T executeWithArray(Query query, Object[] params, String sql) throws MetaException {
+    try {
+      return (T)((params == null) ? query.execute() : query.executeWithArray(params));
+    } catch (Exception ex) {
+      StringBuilder errorBuilder = new StringBuilder("Failed to execute [" + sql + "] with parameters [");
+      if (params != null) {
+        boolean isFirst = true;
+        for (Object param : params) {
+          errorBuilder.append((isFirst ? "" : ", ") + param);
+          isFirst = false;
+        }
+      }
+      LOG.warn(errorBuilder.toString() + "]", ex);
+      // We just logged an exception with (in case of JDO) a humongous callstack. Make a new one.
+      throw new MetaException("See previous errors; " + ex.getMessage());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  static List<Object[]> ensureList(Object result) throws MetaException {
+    if (!(result instanceof List<?>)) {
+      throw new MetaException("Wrong result type " + result.getClass());
+    }
+    return (List<Object[]>)result;
+  }
+
+  static Long extractSqlLong(Object obj) throws MetaException {
+    if (obj == null) return null;
+    if (!(obj instanceof Number)) {
+      throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
+    }
+    return ((Number)obj).longValue();
+  }
+
+  static void timingTrace(boolean doTrace, String queryText, long start, long queryTime) {
+    if (!doTrace) return;
+    LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
+        (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
+  }
+
+  static <T> int loopJoinOrderedResult(PersistenceManager pm, TreeMap<Long, T> tree,
+      String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException {
+    return loopJoinOrderedResult(pm, tree, queryText, null, keyIndex, func);
+  }
+  /**
+   * Merges applies the result of a PM SQL query into a tree of object.
+   * Essentially it's an object join. DN could do this for us, but it issues queries
+   * separately for every object, which is suboptimal.
+   * @param pm
+   * @param tree The object tree, by ID.
+   * @param queryText The query text.
+   * @param keyIndex Index of the Long column corresponding to the map ID in query result rows.
+   * @param func The function that is called on each (object,row) pair with the same id.
+   * @return the count of results returned from the query.
+   */
+  static <T> int loopJoinOrderedResult(PersistenceManager pm, TreeMap<Long, T> tree,
+      String queryText, Object[] parameters, int keyIndex, ApplyFunc<T> func) throws MetaException {
+    boolean doTrace = LOG.isDebugEnabled();
+    long start = doTrace ? System.nanoTime() : 0;
+    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+    Object result = null;
+    if (parameters == null || parameters.length == 0) {
+      result = query.execute();
+    } else {
+      result = query.executeWithArray(parameters);
+    }
+    long queryTime = doTrace ? System.nanoTime() : 0;
+    if (result == null) {
+      query.closeAll();
+      return 0;
+    }
+    List<Object[]> list = ensureList(result);
+    Iterator<Object[]> iter = list.iterator();
+    Object[] fields = null;
+    for (Map.Entry<Long, T> entry : tree.entrySet()) {
+      if (fields == null && !iter.hasNext()) break;
+      long id = entry.getKey();
+      while (fields != null || iter.hasNext()) {
+        if (fields == null) {
+          fields = iter.next();
+        }
+        long nestedId = extractSqlLong(fields[keyIndex]);
+        if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId);
+        if (nestedId > id) break; // fields belong to one of the next entries
+        func.apply(entry.getValue(), fields);
+        fields = null;
+      }
+      Deadline.checkTimeout();
+    }
+    int rv = list.size();
+    query.closeAll();
+    timingTrace(doTrace, queryText, start, queryTime);
+    return rv;
+  }
+
+  static void setPartitionParameters(String PARTITION_PARAMS, boolean convertMapNullsToEmptyStrings,
+      PersistenceManager pm, String partIds, TreeMap<Long, Partition> partitions)
+      throws MetaException {
+    String queryText;
+    queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + PARTITION_PARAMS + ""
+        + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null"
+        + " order by \"PART_ID\" asc";
+    loopJoinOrderedResult(pm, partitions, queryText, 0, new ApplyFunc<Partition>() {
+      @Override
+      public void apply(Partition t, Object[] fields) {
+        t.putToParameters((String)fields[1], (String)fields[2]);
+      }});
+    // Perform conversion of null map values
+    for (Partition t : partitions.values()) {
+      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+    }
+  }
+
+  static void setPartitionParametersWithFilter(String PARTITION_PARAMS,
+      boolean convertMapNullsToEmptyStrings, PersistenceManager pm, String partIds,
+      TreeMap<Long, Partition> partitions, String includeParamKeyPattern, String excludeParamKeyPattern)
+      throws MetaException {
+    StringBuilder queryTextBuilder = new StringBuilder("select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from ")
+        .append(PARTITION_PARAMS)
+        .append(" where \"PART_ID\" in (")
+        .append(partIds)
+        .append(") and \"PARAM_KEY\" is not null");
+    List<Object> queryParams = new ArrayList<>(2);;
+    if (includeParamKeyPattern != null && !includeParamKeyPattern.isEmpty()) {
+      queryTextBuilder.append(" and \"PARAM_KEY\" LIKE (?)");
+      queryParams.add(includeParamKeyPattern);
+    }
+    if (excludeParamKeyPattern != null && !excludeParamKeyPattern.isEmpty()) {
+      queryTextBuilder.append(" and \"PARAM_KEY\" NOT LIKE (?)");
+      queryParams.add(excludeParamKeyPattern);
+    }
+
+    queryTextBuilder.append(" order by \"PART_ID\" asc");
+    String queryText = queryTextBuilder.toString();
+    loopJoinOrderedResult(pm, partitions, queryText, queryParams.toArray(), 0, new ApplyFunc<Partition>() {
+      @Override
+      public void apply(Partition t, Object[] fields) {
+        t.putToParameters((String) fields[1], (String) fields[2]);
+      }
+    });
+    // Perform conversion of null map values
+    for (Partition t : partitions.values()) {
+      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+    }
+  }
+
+  static void setPartitionValues(String PARTITION_KEY_VALS, PersistenceManager pm, String partIds,
+      TreeMap<Long, Partition> partitions)
+      throws MetaException {
+    String queryText;
+    queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from " + PARTITION_KEY_VALS + ""
+        + " where \"PART_ID\" in (" + partIds + ")"
+        + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc";
+    loopJoinOrderedResult(pm, partitions, queryText, 0, new ApplyFunc<Partition>() {
+      @Override
+      public void apply(Partition t, Object[] fields) {
+        t.addToValues((String)fields[1]);
+      }});
+  }
+
+  static String extractSqlClob(Object value) {
+    if (value == null) return null;
+    try {
+      if (value instanceof Clob) {
+        // we trim the Clob value to a max length an int can hold
+        int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2;
+        return ((Clob)value).getSubString(1L, maxLength);
+      } else {
+        return value.toString();
+      }
+    } catch (SQLException sqle) {
+      return null;
+    }
+  }
+
+  static void setSDParameters(String SD_PARAMS, boolean convertMapNullsToEmptyStrings,
+      PersistenceManager pm, TreeMap<Long, StorageDescriptor> sds, String sdIds)
+      throws MetaException {
+    String queryText;
+    queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SD_PARAMS + ""
+        + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null"
+        + " order by \"SD_ID\" asc";
+    loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      @Override
+      public void apply(StorageDescriptor t, Object[] fields) {
+        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
+      }});
+    // Perform conversion of null map values
+    for (StorageDescriptor t : sds.values()) {
+      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+    }
+  }
+
+  static int extractSqlInt(Object field) {
+    return ((Number)field).intValue();
+  }
+
+  static void setSDSortCols(String SORT_COLS, List<String> columnNames, PersistenceManager pm,
+      TreeMap<Long, StorageDescriptor> sds, String sdIds)
+      throws MetaException {
+    StringBuilder queryTextBuilder = new StringBuilder("select \"SD_ID\"");
+    int counter = 0;
+    if (columnNames.contains("col")) {
+      counter++;
+      queryTextBuilder.append(", \"COLUMN_NAME\"");
+    }
+    if (columnNames.contains("order")) {
+      counter++;
+      queryTextBuilder.append(", \"ORDER\"");
+    }
+    queryTextBuilder
+        .append(" from ")
+        .append(SORT_COLS)
+        .append(" where \"SD_ID\" in (")
+        .append(sdIds)
+        .append(") order by \"SD_ID\" asc, \"INTEGER_IDX\" asc");
+    String queryText = queryTextBuilder.toString();
+    final int finalCounter = counter;
+    loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      @Override
+      public void apply(StorageDescriptor t, Object[] fields) {
+        if (finalCounter > 1 && fields[2] == null) {
+          return;
+        }
+        Order order = new Order();
+        if (finalCounter > 0) {
+          order.setCol((String) fields[1]);
+        }
+        if (finalCounter > 1) {
+          order.setOrder(extractSqlInt(fields[2]));
+        }
+        t.addToSortCols(order);
+      }});
+  }
+
+  static void setSDSortCols(String SORT_COLS, PersistenceManager pm,
+      TreeMap<Long, StorageDescriptor> sds, String sdIds)
+      throws MetaException {
+    String queryText;
+    queryText = "select \"SD_ID\", \"COLUMN_NAME\", " + SORT_COLS + ".\"ORDER\""
+        + " from " + SORT_COLS + ""
+        + " where \"SD_ID\" in (" + sdIds + ")"
+        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+    loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      @Override
+      public void apply(StorageDescriptor t, Object[] fields) {
+        if (fields[2] == null) return;
+        t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2])));
+      }});
+  }
+
+  static void setSDBucketCols(String BUCKETING_COLS, PersistenceManager pm,
+      TreeMap<Long, StorageDescriptor> sds, String sdIds)
+      throws MetaException {
+    String queryText;
+    queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from " + BUCKETING_COLS + ""
+        + " where \"SD_ID\" in (" + sdIds + ")"
+        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+    loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      @Override
+      public void apply(StorageDescriptor t, Object[] fields) {
+        t.addToBucketCols((String)fields[1]);
+      }});
+  }
+
+  static boolean setSkewedColNames(String SKEWED_COL_NAMES, PersistenceManager pm,
+      TreeMap<Long, StorageDescriptor> sds, String sdIds)
+      throws MetaException {
+    String queryText;
+    queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from " + SKEWED_COL_NAMES + ""
+        + " where \"SD_ID\" in (" + sdIds + ")"
+        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
+    return loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      @Override
+      public void apply(StorageDescriptor t, Object[] fields) {
+        if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+        t.getSkewedInfo().addToSkewedColNames((String)fields[1]);
+      }}) > 0;
+  }
+
+  static void setSkewedColValues(String SKEWED_STRING_LIST_VALUES, String SKEWED_VALUES,
+      PersistenceManager pm, TreeMap<Long, StorageDescriptor> sds, String sdIds)
+      throws MetaException {
+    String queryText;
+    queryText =
+          "select " + SKEWED_VALUES + ".\"SD_ID_OID\","
+        + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\","
+        + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
+        + "from " + SKEWED_VALUES + " "
+        + "  left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_VALUES + "."
+        + "\"STRING_LIST_ID_EID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
+        + "where " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ") "
+        + "  and " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" is not null "
+        + "  and " + SKEWED_VALUES + ".\"INTEGER_IDX\" >= 0 "
+        + "order by " + SKEWED_VALUES + ".\"SD_ID_OID\" asc, " + SKEWED_VALUES + ".\"INTEGER_IDX\" asc,"
+        + "  " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
+    loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      private Long currentListId;
+      private List<String> currentList;
+      @Override
+      public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
+        if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
+        // Note that this is not a typical list accumulator - there's no call to finalize
+        // the last list. Instead we add list to SD first, as well as locally to add elements.
+        if (fields[1] == null) {
+          currentList = null; // left outer join produced a list with no values
+          currentListId = null;
+          t.getSkewedInfo().addToSkewedColValues(Collections.<String>emptyList());
+        } else {
+          long fieldsListId = extractSqlLong(fields[1]);
+          if (currentListId == null || fieldsListId != currentListId) {
+            currentList = new ArrayList<String>();
+            currentListId = fieldsListId;
+            t.getSkewedInfo().addToSkewedColValues(currentList);
+          }
+          currentList.add((String)fields[2]);
+        }
+      }});
+  }
+
+  static void setSkewedColLocationMaps(String SKEWED_COL_VALUE_LOC_MAP,
+      String SKEWED_STRING_LIST_VALUES, PersistenceManager pm, TreeMap<Long, StorageDescriptor> sds,
+      String sdIds)
+      throws MetaException {
+    String queryText;
+    queryText =
+          "select " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\","
+        + " " + SKEWED_STRING_LIST_VALUES + ".STRING_LIST_ID,"
+        + " " + SKEWED_COL_VALUE_LOC_MAP + ".\"LOCATION\","
+        + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
+        + "from " + SKEWED_COL_VALUE_LOC_MAP + ""
+        + "  left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_COL_VALUE_LOC_MAP + "."
+        + "\"STRING_LIST_ID_KID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
+        + "where " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" in (" + sdIds + ")"
+        + "  and " + SKEWED_COL_VALUE_LOC_MAP + ".\"STRING_LIST_ID_KID\" is not null "
+        + "order by " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" asc,"
+        + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" asc,"
+        + "  " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
+
+    loopJoinOrderedResult(pm, sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
+      private Long currentListId;
+      private List<String> currentList;
+      @Override
+      public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
+        if (!t.isSetSkewedInfo()) {
+          SkewedInfo skewedInfo = new SkewedInfo();
+          skewedInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
+          t.setSkewedInfo(skewedInfo);
+        }
+        Map<List<String>, String> skewMap = t.getSkewedInfo().getSkewedColValueLocationMaps();
+        // Note that this is not a typical list accumulator - there's no call to finalize
+        // the last list. Instead we add list to SD first, as well as locally to add elements.
+        if (fields[1] == null) {
+          currentList = new ArrayList<String>(); // left outer join produced a list with no values
+          currentListId = null;
+        } else {
+          long fieldsListId = extractSqlLong(fields[1]);
+          if (currentListId == null || fieldsListId != currentListId) {
+            currentList = new ArrayList<String>();
+            currentListId = fieldsListId;
+          } else {
+            skewMap.remove(currentList); // value based compare.. remove first
+          }
+          currentList.add((String)fields[3]);
+        }
+        skewMap.put(currentList, (String)fields[2]);
+      }});
+  }
+
+  static void setSDCols(String COLUMNS_V2, List<String> columnNames, PersistenceManager pm,
+      TreeMap<Long, List<FieldSchema>> colss, String colIds)
+      throws MetaException {
+    StringBuilder queryTextBuilder = new StringBuilder("select \"CD_ID\"");
+    int counter = 0;
+    if (columnNames.contains("name")) {
+      counter++;
+      queryTextBuilder.append(", \"COLUMN_NAME\"");
+    }
+    if (columnNames.contains("type")) {
+      counter++;
+      queryTextBuilder.append(", \"TYPE_NAME\"");
+    }
+    if (columnNames.contains("comment")) {
+      counter++;
+      queryTextBuilder.append(", \"COMMENT\"");
+    }
+    queryTextBuilder
+        .append(" from ")
+        .append(COLUMNS_V2)
+        .append(" where \"CD_ID\" in (")
+        .append(colIds)
+        .append(") order by \"CD_ID\" asc, \"INTEGER_IDX\" asc");
+    String queryText = queryTextBuilder.toString();
+    int finalCounter = counter;
+    loopJoinOrderedResult(pm, colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
+      @Override
+      public void apply(List<FieldSchema> t, Object[] fields) {
+        FieldSchema fieldSchema = new FieldSchema();
+        if (finalCounter > 0) {
+          fieldSchema.setName((String) fields[1]);
+        }
+        if (finalCounter > 1) {
+          fieldSchema.setType(extractSqlClob(fields[2]));
+        }
+        if (finalCounter > 2) {
+          fieldSchema.setComment((String) fields[3]);
+        }
+        t.add(fieldSchema);
+      }});
+  }
+
+  static void setSDCols(String COLUMNS_V2, PersistenceManager pm,
+      TreeMap<Long, List<FieldSchema>> colss, String colIds)
+      throws MetaException {
+    String queryText;
+    queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\""
+        + " from " + COLUMNS_V2 + " where \"CD_ID\" in (" + colIds + ")"
+        + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc";
+    loopJoinOrderedResult(pm, colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
+      @Override
+      public void apply(List<FieldSchema> t, Object[] fields) {
+        t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1]));
+      }});
+  }
+
+  static void setSerdeParams(String SERDE_PARAMS, boolean convertMapNullsToEmptyStrings,
+      PersistenceManager pm, TreeMap<Long, SerDeInfo> serdes, String serdeIds) throws MetaException {
+    String queryText;
+    queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SERDE_PARAMS + ""
+        + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null"
+        + " order by \"SERDE_ID\" asc";
+    loopJoinOrderedResult(pm, serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
+      @Override
+      public void apply(SerDeInfo t, Object[] fields) {
+        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
+      }});
+    // Perform conversion of null map values
+    for (SerDeInfo t : serdes.values()) {
+      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
+    }
+  }
+
+  /**
+   * Convert a boolean value returned from the RDBMS to a Java Boolean object.
+   * MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping.
+   *
+   * @param value
+   *          column value from the database
+   * @return The Boolean value of the database column value, null if the column
+   *         value is null
+   * @throws MetaException
+   *           if the column value cannot be converted into a Boolean object
+   */
+  static Boolean extractSqlBoolean(Object value) throws MetaException {
+    if (value == null) {
+      return null;
+    }
+    if (value instanceof Boolean) {
+      return (Boolean)value;
+    }
+    if (value instanceof String) {
+      try {
+        return BooleanUtils.toBooleanObject((String) value, "Y", "N", null);
+      } catch (IllegalArgumentException iae) {
+        // NOOP
+      }
+    }
+    throw new MetaException("Cannot extract boolean from column value " + value);
+  }
+
+  static String extractSqlString(Object value) {
+    if (value == null) return null;
+    return value.toString();
+  }
+
+  static Double extractSqlDouble(Object obj) throws MetaException {
+    if (obj == null)
+      return null;
+    if (!(obj instanceof Number)) {
+      throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
+    }
+    return ((Number) obj).doubleValue();
+  }
+
+  static byte[] extractSqlBlob(Object value) throws MetaException {
+    if (value == null)
+      return null;
+    if (value instanceof Blob) {
+      //derby, oracle
+      try {
+        // getBytes function says: pos the ordinal position of the first byte in
+        // the BLOB value to be extracted; the first byte is at position 1
+        return ((Blob) value).getBytes(1, (int) ((Blob) value).length());
+      } catch (SQLException e) {
+        throw new MetaException("Encounter error while processing blob.");
+      }
+    }
+    else if (value instanceof byte[]) {
+      // mysql, postgres, sql server
+      return (byte[]) value;
+    }
+	else {
+      // this may happen when enablebitvector is false
+      LOG.debug("Expected blob type but got " + value.getClass().getName());
+      return null;
+    }
+  }
+
+  @FunctionalInterface
+  static interface ApplyFunc<Target> {
+    void apply(Target t, Object[] fields) throws MetaException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 5372714..66977d7 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -69,6 +69,7 @@ import javax.jdo.datastore.JDOConnection;
 import javax.jdo.identity.IntIdentity;
 import javax.sql.DataSource;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -2031,8 +2032,11 @@ public class ObjectStore implements RawStore, Configurable {
     return keys;
   }
 
-  private SerDeInfo convertToSerDeInfo(MSerDeInfo ms) throws MetaException {
+  private SerDeInfo convertToSerDeInfo(MSerDeInfo ms, boolean allowNull) throws MetaException {
     if (ms == null) {
+      if (allowNull) {
+        return null;
+      }
       throw new MetaException("Invalid SerDeInfo object");
     }
     SerDeInfo serde =
@@ -2086,7 +2090,7 @@ public class ObjectStore implements RawStore, Configurable {
     StorageDescriptor sd = new StorageDescriptor(noFS ? null : convertToFieldSchemas(mFieldSchemas),
         msd.getLocation(), msd.getInputFormat(), msd.getOutputFormat(), msd
         .isCompressed(), msd.getNumBuckets(), convertToSerDeInfo(msd
-        .getSerDeInfo()), convertList(msd.getBucketCols()), convertToOrders(msd
+        .getSerDeInfo(), true), convertList(msd.getBucketCols()), convertToOrders(msd
         .getSortCols()), convertMap(msd.getParameters()));
     SkewedInfo skewedInfo = new SkewedInfo(convertList(msd.getSkewedColNames()),
         convertToSkewedValues(msd.getSkewedColValues()),
@@ -2607,11 +2611,17 @@ public class ObjectStore implements RawStore, Configurable {
     if (mpart == null) {
       return null;
     }
-    Partition p = new Partition(convertList(mpart.getValues()), mpart.getTable().getDatabase()
-        .getName(), mpart.getTable().getTableName(), mpart.getCreateTime(),
+    //its possible that MPartition is partially filled, do null checks to avoid NPE
+    MTable table = mpart.getTable();
+    String dbName =
+        table == null ? null : table.getDatabase() == null ? null : table.getDatabase().getName();
+    String tableName = table == null ? null : table.getTableName();
+    String catName = table == null ? null :
+        table.getDatabase() == null ? null : table.getDatabase().getCatalogName();
+    Partition p = new Partition(convertList(mpart.getValues()), dbName, tableName, mpart.getCreateTime(),
         mpart.getLastAccessTime(), convertToStorageDescriptor(mpart.getSd()),
         convertMap(mpart.getParameters()));
-    p.setCatName(mpart.getTable().getDatabase().getCatalogName());
+    p.setCatName(catName);
     p.setWriteId(mpart.getWriteId());
     return p;
   }
@@ -3349,6 +3359,64 @@ public class ObjectStore implements RawStore, Configurable {
     return mparts;
   }
 
+  // This code is only executed in JDO code path, not from direct SQL code path.
+  private List<MPartition> listMPartitionsWithProjection(String catName, String dbName, String tblName, int max,
+      QueryWrapper queryWrapper, List<String> fieldNames) throws MetaException {
+    boolean success = false;
+    List<MPartition> mparts = null;
+    try {
+      openTransaction();
+      LOG.debug("Executing listMPartitionsWithProjection");
+      dbName = normalizeIdentifier(dbName);
+      tblName = normalizeIdentifier(tblName);
+      Query query = queryWrapper.query = pm.newQuery(MPartition.class,
+          "table.tableName == t1 && table.database.name == t2 && table.database.catalogName == t3");
+      query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.String t3");
+      query.setOrdering("partitionName ascending");
+      if (max >= 0) {
+        query.setRange(0, max);
+      }
+      if (fieldNames == null || fieldNames.isEmpty()) {
+        // full fetch of partitions
+        mparts = (List<MPartition>) query.execute(tblName, dbName, catName);
+        pm.retrieveAll(mparts);
+      } else {
+        // fetch partially filled partitions using result clause
+        query.setResult(Joiner.on(',').join(fieldNames));
+        // if more than one fields are in the result class the return type is List<Object[]>
+        if (fieldNames.size() > 1) {
+          List<Object[]> results = (List<Object[]>) query.execute(tblName, dbName, catName);
+          mparts = new ArrayList<>(results.size());
+          for (Object[] row : results) {
+            MPartition mpart = new MPartition();
+            int i = 0;
+            for (Object val : row) {
+              MetaStoreServerUtils.setNestedProperty(mpart, fieldNames.get(i), val, true);
+              i++;
+            }
+            mparts.add(mpart);
+          }
+        } else {
+          // only one field is requested, return type is List<Object>
+          List<Object> results = (List<Object>) query.execute(tblName, dbName, catName);
+          mparts = new ArrayList<>(results.size());
+          for (Object row : results) {
+            MPartition mpart = new MPartition();
+            MetaStoreServerUtils.setNestedProperty(mpart, fieldNames.get(0), row, true);
+            mparts.add(mpart);
+          }
+        }
+      }
+      success = commitTransaction();
+      LOG.debug("Done retrieving {} objects for listMPartitionsWithProjection", mparts.size());
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return mparts;
+  }
+
   @Override
   public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName,
       List<String> partNames) throws MetaException, NoSuchObjectException {
@@ -3492,7 +3560,6 @@ public class ObjectStore implements RawStore, Configurable {
     return results;
   }
 
-
   private Integer getNumPartitionsViaOrmFilter(Table table, ExpressionTree tree, boolean isValidatedFilter)
     throws MetaException {
     Map<String, Object> params = new HashMap<>();
@@ -3620,17 +3687,23 @@ public class ObjectStore implements RawStore, Configurable {
     private boolean doUseDirectSql;
     private long start;
     private Table table;
+    protected final List<String> partitionFields;
     protected final String catName, dbName, tblName;
     private boolean success = false;
     protected T results = null;
 
     public GetHelper(String catalogName, String dbName, String tblName,
-                     boolean allowSql, boolean allowJdo)
-        throws MetaException {
+        boolean allowSql, boolean allowJdo) throws MetaException {
+      this(catalogName, dbName, tblName, null, allowSql, allowJdo);
+    }
+
+    public GetHelper(String catalogName, String dbName, String tblName,
+        List<String> fields, boolean allowSql, boolean allowJdo) throws MetaException {
       assert allowSql || allowJdo;
       this.allowJdo = allowJdo;
       this.catName = (catalogName != null) ? normalizeIdentifier(catalogName) : null;
       this.dbName = (dbName != null) ? normalizeIdentifier(dbName) : null;
+      this.partitionFields = fields;
       if (tblName != null) {
         this.tblName = normalizeIdentifier(tblName);
       } else {
@@ -3813,7 +3886,12 @@ public class ObjectStore implements RawStore, Configurable {
   private abstract class GetListHelper<T> extends GetHelper<List<T>> {
     public GetListHelper(String catName, String dbName, String tblName, boolean allowSql,
                          boolean allowJdo) throws MetaException {
-      super(catName, dbName, tblName, allowSql, allowJdo);
+      super(catName, dbName, tblName, null, allowSql, allowJdo);
+    }
+
+    public GetListHelper(String catName, String dbName, String tblName, List<String> fields,
+        boolean allowSql, boolean allowJdo) throws MetaException {
+      super(catName, dbName, tblName, fields, allowSql, allowJdo);
     }
 
     @Override
@@ -3961,6 +4039,44 @@ public class ObjectStore implements RawStore, Configurable {
     }.run(true);
   }
 
+  @Override
+  public List<Partition> getPartitionSpecsByFilterAndProjection(String catName, String dbName,
+                                                                String tblName, List<String> fieldList,
+                                                                String includeParamKeyPattern,
+                                                                String excludeParamKeyPattern)
+      throws MetaException, NoSuchObjectException {
+    if (fieldList == null || fieldList.isEmpty()) {
+      // no fields are requested. Fallback to regular getPartitions implementation to return all the fields
+      return getPartitionsInternal(catName, dbName, tblName, -1, true, true);
+    }
+
+    return new GetListHelper<Partition>(catName, dbName, tblName,
+        fieldList, true, true) {
+
+      @Override
+      protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx) throws MetaException {
+        return directSql
+            .getPartitionSpecsUsingProjection(ctx.getTable(), ctx.partitionFields, includeParamKeyPattern,
+                excludeParamKeyPattern);
+      }
+
+      @Override
+      protected List<Partition> getJdoResult(
+          GetHelper<List<Partition>> ctx) throws MetaException {
+        // For single-valued fields we can use setResult() to implement projection of fields but
+        // JDO doesn't support multi-valued fields in setResult() so currently JDO implementation
+        // fallbacks to full-partition fetch if the requested fields contain multi-valued fields
+        // TODO: Add param filtering logic
+        List<String> fieldNames = PartitionProjectionEvaluator.getMPartitionFieldNames(ctx.partitionFields);
+        try (QueryWrapper queryWrapper = new QueryWrapper()) {
+          return convertToParts(
+              listMPartitionsWithProjection(catName, dbName, tblName, -1, queryWrapper, fieldNames));
+        }
+      }
+    }.run(true);
+
+  }
+
   /**
    * Gets the table object for a given table, throws if anything goes wrong.
    * @param dbName Database name.
@@ -4403,7 +4519,7 @@ public class ObjectStore implements RawStore, Configurable {
     }
 
     oldSd.setBucketCols(newSd.getBucketCols());
-    oldSd.setCompressed(newSd.isCompressed());
+    oldSd.setIsCompressed(newSd.isCompressed());
     oldSd.setInputFormat(newSd.getInputFormat());
     oldSd.setOutputFormat(newSd.getOutputFormat());
     oldSd.setNumBuckets(newSd.getNumBuckets());
@@ -11120,7 +11236,7 @@ public class ObjectStore implements RawStore, Configurable {
       if (mSerDeInfo == null) {
         throw new NoSuchObjectException("No SerDe named " + serDeName);
       }
-      SerDeInfo serde = convertToSerDeInfo(mSerDeInfo);
+      SerDeInfo serde = convertToSerDeInfo(mSerDeInfo, false);
       committed = commitTransaction();
       return serde;
     } finally {
@@ -11238,7 +11354,7 @@ public class ObjectStore implements RawStore, Configurable {
       schemaVersion.setName(mSchemaVersion.getName());
     }
     if (mSchemaVersion.getSerDe() != null) {
-      schemaVersion.setSerDe(convertToSerDeInfo(mSchemaVersion.getSerDe()));
+      schemaVersion.setSerDe(convertToSerDeInfo(mSchemaVersion.getSerDe(), false));
     }
     return schemaVersion;
   }