You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/10/07 22:47:38 UTC

[02/13] hive git commit: HIVE-20306 : Implement projection spec for fetching only requested fields from partitions (Vihang Karajgaonkar, reviewed by Aihua Xu, Andrew Sherman and Alexander Kolbasov)

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionProjectionEvaluator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionProjectionEvaluator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionProjectionEvaluator.java
new file mode 100644
index 0000000..e918a33
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionProjectionEvaluator.java
@@ -0,0 +1,889 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.regex.Pattern;
+
+import static org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlLong;
+
+/**
+ * Evaluator for partition projection filters which specify parts of the partition that should be
+ * used using dot notation for fields.
+ */
+public class PartitionProjectionEvaluator {
+  private static final Logger LOG = LoggerFactory.getLogger(PartitionProjectionEvaluator.class);
+  private final boolean convertMapNullsToEmptyStrings;
+  private final boolean isView;
+  private final String includeParamKeyPattern;
+  private final String excludeParamKeyPattern;
+  private Set<String> projectionFields;
+
+  interface PartitionFieldValueSetter<T> {
+    void setValue(T part, PartitionFieldNode node, Object value) throws MetaException;
+  }
+
+  private final ImmutableMap<String, MutivaluedFieldSetter> multiValuedFieldSetters =
+      new ImmutableMap.Builder<String, MutivaluedFieldSetter>()
+          .put("values", new PartitionValuesSetter())
+          .put("parameters", new PartitionParametersSetter())
+          .put("sd.cols", new PartitionSDColsSetter())
+          .put("sd.bucketCols", new PartitionSDBucketColsSetter())
+          .put("sd.sortCols", new PartitionSortColsSetter())
+          .put("sd.parameters", new PartitionSDParametersSetter())
+          .put("sd.serdeInfo.parameters", new PartitionSerdeInfoParametersSetter())
+          .put("sd.skewedInfo.skewedColNames", new PartitionSkewedColsNamesSetter())
+          .put("sd.skewedInfo.skewedColValues", new PartitionSkewedColsValuesSetter())
+          .put("sd.skewedInfo.skewedColValueLocationMaps",
+              new PartitionSkewedColValLocationMapSetter()).build();
+
+  private static final String PART_ID = "PART_ID";
+  private static final String SD_ID = "SD_ID";
+  private static final String SERDE_ID = "SERDE_ID";
+  private static final String CD_ID = "CD_ID";
+
+  private static final PartitionFieldNode partIdNode = new PartitionFieldNode(PART_ID);
+  private static final PartitionFieldNode sdIdNode = new PartitionFieldNode(SD_ID);
+  private static final PartitionFieldNode serdeIdNode = new PartitionFieldNode(SERDE_ID);
+  private static final PartitionFieldNode cdIdNode = new PartitionFieldNode(CD_ID);
+
+  private final ImmutableMap<String, String> fieldNameToTableName;
+  private final Set<PartitionFieldNode> roots;
+  private final String PARTITIONS;
+  private final String SDS;
+  private final String SERDES;
+  private final String PARTITION_PARAMS;
+  private final PersistenceManager pm;
+
+  @VisibleForTesting static final String SD_PATTERN = "sd|sd\\.";
+  @VisibleForTesting static final String SERDE_PATTERN = "sd\\.serdeInfo|sd\\.serdeInfo\\.";
+  @VisibleForTesting static final String CD_PATTERN = "sd\\.cols|sd\\.cols\\.";
+
+  private static final int SD_INDEX = 0;
+  private static final int CD_INDEX = 1;
+  private static final int SERDE_INDEX = 2;
+  private static final int PART_INDEX = 3;
+
+  // this map stores all the single valued fields in the Partition class and maps them to the corresponding
+  // single-valued fields from the MPartition class. This map is used to parse the given partition fields
+  // as well as to convert a given partition field list to a JDO setResult string when direct-SQL
+  // is disabled
+  private static final ImmutableMap<String, String> allPartitionSingleValuedFields = new ImmutableMap.Builder<String, String>()
+      .put("dbName", "table.database.name")
+      .put("tableName", "table.tableName")
+      .put("createTime", "createTime")
+      .put("lastAccessTime", "lastAccessTime")
+      .put("sd.location", "sd.location")
+      .put("sd.inputFormat", "sd.inputFormat")
+      .put("sd.outputFormat", "sd.outputFormat")
+      .put("sd.compressed", "sd.isCompressed")
+      .put("sd.numBuckets", "sd.numBuckets")
+      .put("sd.serdeInfo.name", "sd.serDeInfo.name")
+      .put("sd.serdeInfo.serializationLib", "sd.serDeInfo.serializationLib")
+      .put("sd.serdeInfo.description", "sd.serDeInfo.description")
+      .put("sd.serdeInfo.serializerClass", "sd.serDeInfo.serializerClass")
+      .put("sd.serdeInfo.deserializerClass", "sd.serDeInfo.deserializerClass")
+      .put("sd.serdeInfo.serdeType", "sd.serDeInfo.serdeType")
+      .put("catName", "table.database.catalogName")
+      .put("writeId", "writeId")
+      //TODO there is no mapping for isStatsCompliant to JDO MPartition
+      //.put("isStatsCompliant", "isStatsCompliant")
+      .build();
+
+  private static final ImmutableSet<String> allPartitionMultiValuedFields = new ImmutableSet.Builder<String>()
+      .add("values")
+      .add("sd.cols.name")
+      .add("sd.cols.type")
+      .add("sd.cols.comment")
+      .add("sd.serdeInfo.parameters")
+      .add("sd.bucketCols")
+      .add("sd.sortCols.col")
+      .add("sd.sortCols.order")
+      .add("sd.parameters")
+      .add("sd.skewedInfo.skewedColNames")
+      .add("sd.skewedInfo.skewedColValues")
+      .add("sd.skewedInfo.skewedColValueLocationMaps")
+      .add("parameters")
+      .add("privileges.userPrivileges")
+      .add("privileges.groupPrivileges")
+      .add("privileges.rolePrivileges")
+      .build();
+
+  private static final ImmutableSet<String> allPartitionFields = new ImmutableSet.Builder<String>()
+      .addAll(allPartitionSingleValuedFields.keySet())
+      .addAll(allPartitionMultiValuedFields)
+      .build();
+
+  public PartitionProjectionEvaluator(PersistenceManager pm,
+      ImmutableMap<String, String> fieldNameToTableName, List<String> projectionFields,
+      boolean convertMapNullsToEmptyStrings, boolean isView, String includeParamKeyPattern,
+      String excludeParamKeyPattern) throws MetaException {
+    this.pm = pm;
+    this.fieldNameToTableName = fieldNameToTableName;
+    this.convertMapNullsToEmptyStrings = convertMapNullsToEmptyStrings;
+    this.isView = isView;
+    this.includeParamKeyPattern = includeParamKeyPattern;
+    this.excludeParamKeyPattern = excludeParamKeyPattern;
+    this.PARTITIONS =
+        fieldNameToTableName.containsKey("PARTITIONS_TABLE_NAME") ? fieldNameToTableName
+            .get("PARTITIONS_TABLE_NAME") : "PARTITIONS";
+    this.SDS = fieldNameToTableName.containsKey("SDS_TABLE_NAME") ? fieldNameToTableName
+        .get("SDS_TABLE_NAME") : "SDS";
+    this.SERDES = fieldNameToTableName.containsKey("SERDES_TABLE_NAME") ? fieldNameToTableName
+        .get("SERDES_TABLE_NAME") : "SERDES";
+    this.PARTITION_PARAMS =
+        fieldNameToTableName.containsKey("PARTITION_PARAMS") ? fieldNameToTableName
+            .get("PARTITION_PARAMS") : "PARTITION_PARAMS";
+
+    roots = parse(projectionFields);
+
+    // we always query PART_ID
+    roots.add(partIdNode);
+    if (find(SD_PATTERN)) {
+      roots.add(sdIdNode);
+    }
+    if (find(SERDE_PATTERN)) {
+      roots.add(serdeIdNode);
+    }
+    if (find(CD_PATTERN)) {
+      roots.add(cdIdNode);
+    }
+  }
+
+  /**
+   * Given a Java regex string pattern, checks if the the partitionFieldNode tree
+   * has any node whose fieldName matches the given pattern
+   * @param searchField
+   * @return
+   */
+  @VisibleForTesting
+  boolean find(String searchField) {
+    Pattern p = Pattern.compile(searchField);
+    for (PartitionFieldNode node : roots) {
+      if (find(node, p)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean find(PartitionFieldNode root, Pattern p) {
+    if (root == null) {
+      return false;
+    }
+    if (p.matcher(root.fieldName).matches()) {
+      return true;
+    }
+    for (PartitionFieldNode child : root.children) {
+      if (find(child, p)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * if top level field name is given expand the top level field such that all the children
+   * of that node are added to the projection list. eg. if only "sd" is provided in the projection
+   * list, it means all the nested fields for sd should be added to the projection fields
+   * @param projectionList
+   * @return
+   */
+  private static Set<String> expand(Collection<String> projectionList) throws MetaException {
+    Set<String> result = new HashSet<>();
+    for (String projectedField : projectionList) {
+      if (allPartitionFields.contains(projectedField)) {
+        result.add(projectedField);
+      } else {
+        boolean found = false;
+        for (String partitionField : allPartitionFields) {
+          if (partitionField.startsWith(projectedField)) {
+            LOG.debug("Found " + partitionField + " included within given projection field "
+                + projectedField);
+            result.add(partitionField);
+            found = true;
+          }
+        }
+        if (!found) {
+          throw new MetaException("Invalid field name " + projectedField);
+        }
+      }
+    }
+    return result;
+  }
+
+  @VisibleForTesting
+  Set<PartitionFieldNode> getRoots() {
+    return roots;
+  }
+
+  private static void validate(Collection<String> projectionFields) throws MetaException {
+    Set<String> verify = new HashSet<>(projectionFields);
+    verify.removeAll(allPartitionFields);
+    if (verify.size() > 0) {
+      throw new MetaException("Invalid partition fields in the projection spec" + Arrays
+          .toString(verify.toArray(new String[verify.size()])));
+    }
+  }
+
+  private Set<PartitionFieldNode> parse(List<String> inputProjectionFields) throws MetaException {
+    // in case of dbName and tableName we rely on table object to get their values
+    this.projectionFields = new HashSet<>(inputProjectionFields);
+    projectionFields.remove("dbName");
+    projectionFields.remove("tableName");
+    projectionFields.remove("catName");
+    if (isView) {
+      // if this is a view SDs are not set so can be skipped
+      projectionFields.removeIf(
+          s -> s.matches(SD_PATTERN) || s.matches(SERDE_PATTERN) || s.matches(CD_PATTERN));
+    }
+    // remove redundant fields
+    projectionFields = PartitionProjectionEvaluator.expand(projectionFields);
+    removeUnsupportedFields();
+    validate(projectionFields);
+
+    Map<String, PartitionFieldNode> nestedNodes = new HashMap<>();
+    Set<PartitionFieldNode> rootNodes = new HashSet<>();
+
+    for (String projectedField : projectionFields) {
+      String[] fields = projectedField.split("\\.");
+      if (fields.length == 0) {
+        LOG.warn("Invalid projected field {}. Ignoring ..", projectedField);
+        continue;
+      }
+      StringBuilder fieldNameBuilder = new StringBuilder(fields[0]);
+      PartitionFieldNode currentNode = createIfNotExists(nestedNodes, fieldNameBuilder.toString());
+      rootNodes.add(currentNode);
+      for (int level = 1; level < fields.length; level++) {
+        final String name = fieldNameBuilder.append(".").append(fields[level]).toString();
+        PartitionFieldNode childNode = createIfNotExists(nestedNodes, name);
+        // all the children of a multi-valued nodes are also multi-valued
+        if (currentNode.isMultiValued) {
+          childNode.setMultiValued();
+        }
+        currentNode.addChild(childNode);
+        currentNode = childNode;
+      }
+    }
+    return rootNodes;
+  }
+
+  // TODO some of the optional partition fields are never set by DirectSQL implementation
+  // Removing such fields to keep it consistent with methods in MetastoreDirectSQL class
+  private void removeUnsupportedFields() {
+    List<String> unsupportedFields = Arrays
+        .asList("sd.serdeInfo.serializerClass", "sd.serdeInfo.deserializerClass",
+            "sd.serdeInfo.serdeType", "sd.serdeInfo.description");
+    for (String unsupportedField : unsupportedFields) {
+      if (projectionFields.contains(unsupportedField)) {
+        LOG.warn("DirectSQL does not return partitions with the optional field" + unsupportedField
+            + " set. Removing it from the projection list");
+        projectionFields.remove(unsupportedField);
+      }
+    }
+  }
+
+  private PartitionFieldNode createIfNotExists(Map<String, PartitionFieldNode> nestedNodes,
+      String fieldName) {
+    PartitionFieldNode currentNode = nestedNodes.computeIfAbsent(fieldName, k -> {
+      if (multiValuedFieldSetters.containsKey(fieldName)) {
+        return new PartitionFieldNode(fieldName, true);
+      } else {
+        return new PartitionFieldNode(fieldName);
+      }
+    });
+    return currentNode;
+  }
+
+  /**
+   * Given a list of partition ids, returns the List of partially filled partitions based on the
+   * projection list used to instantiate this PartitionProjectionEvaluator
+   * @param partitionIds List of partition ids corresponding to the Partitions objects which are requested
+   * @return Partitions where each partition has only the projected fields set
+   * @throws MetaException
+   */
+  public List<Partition> getPartitionsUsingProjectionList(List<Long> partitionIds)
+      throws MetaException {
+    TreeMap<Long, StorageDescriptor> sds = new TreeMap<>();
+    TreeMap<Long, List<FieldSchema>> cds = new TreeMap<>();
+    TreeMap<Long, SerDeInfo> serdes = new TreeMap<>();
+    TreeMap<Long, Partition> partitions = new TreeMap<>();
+    List<Partition> results = setSingleValuedFields(partitionIds, partitions, sds, serdes, cds);
+    setMultivaluedFields(partitions, sds, serdes, cds);
+    return results;
+  }
+
+  private List<Partition> setSingleValuedFields(List<Long> partitionIds,
+      final TreeMap<Long, Partition> partitions, final TreeMap<Long, StorageDescriptor> sdIds,
+      final TreeMap<Long, SerDeInfo> serdeIds, final TreeMap<Long, List<FieldSchema>> cdIds)
+      throws MetaException {
+
+    StringBuilder queryTextBuilder = new StringBuilder();
+    int numColumns = buildQueryForSingleValuedFields(partitionIds, queryTextBuilder);
+    String queryText = queryTextBuilder.toString();
+
+    try (Query query = pm.newQuery("javax.jdo.query.SQL", queryText)) {
+
+      long start = LOG.isDebugEnabled() ? System.nanoTime() : 0;
+      List<Object> sqlResult = MetastoreDirectSqlUtils.executeWithArray(query, null, queryText);
+      long queryTime = LOG.isDebugEnabled() ? System.nanoTime() : 0;
+      MetastoreDirectSqlUtils.timingTrace(LOG.isDebugEnabled(), queryText, start, queryTime);
+      Deadline.checkTimeout();
+      final Long[] ids = new Long[4];
+      Object[] rowVals = new Object[1];
+      // Keep order by name, consistent with JDO.
+      ArrayList<Partition> orderedResult = new ArrayList<Partition>(partitionIds.size());
+      for (Object row : sqlResult) {
+        if (numColumns > 1) {
+          rowVals = (Object[])row;
+        } else {
+          // only one column is selected by query. The result class will be Object
+          rowVals[0] = row;
+        }
+        Partition part = new Partition();
+        for (PartitionFieldNode root : roots) {
+          traverseAndSetValues(part, root, rowVals, new PartitionFieldValueSetter() {
+            @Override
+            public void setValue(Object partition, PartitionFieldNode node, Object value)
+                throws MetaException {
+              if (!node.isMultiValued) {
+                // in case of serdeid and sdId node we just collect the sdIds for further processing
+                if (node.equals(sdIdNode)) {
+                  ids[SD_INDEX] = extractSqlLong(value);
+                } else if (node.equals(serdeIdNode)) {
+                  ids[SERDE_INDEX] = extractSqlLong(value);
+                } else if (node.equals(cdIdNode)) {
+                  ids[CD_INDEX] = extractSqlLong(value);
+                } else if (node.equals(partIdNode)) {
+                  ids[PART_INDEX] = extractSqlLong(value);
+                } else {
+                  // incase of sd.compressed and sd.storedAsSubDirectories we need special code to convert
+                  // string to a boolean value
+                  if (node.fieldName.equals("sd.compressed") || node.fieldName.equals("sd.storedAsSubDirectories")) {
+                    value = MetastoreDirectSqlUtils.extractSqlBoolean(value);
+                  }
+                  MetaStoreServerUtils.setNestedProperty(partition, node.fieldName, value, true);
+                }
+              }
+            }
+          });
+        }
+        // PART_ID is always queried
+        if (ids[PART_INDEX] == null) {
+          throw new MetaException("Could not find PART_ID for partition " + part);
+        }
+        partitions.put(ids[PART_INDEX], part);
+        orderedResult.add(part);
+        ids[PART_INDEX] = null;
+
+        if (ids[SD_INDEX] != null) {
+          // sd object is initialized if any of the sd single-valued fields are in the projection
+          if (part.getSd() == null) {
+            part.setSd(new StorageDescriptor());
+          }
+          sdIds.put(ids[SD_INDEX], part.getSd());
+          ids[SD_INDEX] = null;
+        }
+
+        if (ids[SERDE_INDEX] != null) {
+          // serde object must have already been intialized above in MetaStoreUtils.setNestedProperty call
+          if (part.getSd().getSerdeInfo() == null) {
+            part.getSd().setSerdeInfo(new SerDeInfo());
+          }
+          serdeIds.put(ids[SERDE_INDEX], part.getSd().getSerdeInfo());
+          ids[SERDE_INDEX] = null;
+        }
+
+        if (ids[CD_INDEX] != null) {
+          // common case is all the SDs will reuse the same CD
+          // allocate List<FieldSchema> only when you see a new CD_ID
+          cdIds.putIfAbsent(ids[CD_INDEX], new ArrayList<>(5));
+          if (part.getSd().getCols() == null) {
+            part.getSd().setCols(cdIds.get(ids[CD_INDEX]));
+          }
+          ids[CD_INDEX] = null;
+        }
+        Deadline.checkTimeout();
+      }
+      return orderedResult;
+    } catch (Exception e) {
+      LOG.error("Exception received while getting partitions using projected fields", e);
+      throw new MetaException(e.getMessage());
+    }
+  }
+
+  private void setMultivaluedFields(TreeMap<Long, Partition> partitions,
+      TreeMap<Long, StorageDescriptor> sds, TreeMap<Long, SerDeInfo> serdes,
+      TreeMap<Long, List<FieldSchema>> cds) throws MetaException {
+    for (PartitionFieldNode root : roots) {
+      traverseAndSetMultiValuedFields(root, partitions, sds, serdes, cds);
+    }
+  }
+
+  private void traverseAndSetMultiValuedFields(PartitionFieldNode root,
+      TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+      TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds) throws MetaException {
+    if (root == null) {
+      return;
+    }
+    // if a multi-valued node is found set its value using its value-setters
+    // note that once a multi-valued node is found the method does not recurse further
+    // this is because the multi-valued setter also sets the values of all its descendents
+    if (root.isMultiValued) {
+      MutivaluedFieldSetter multiValuedFieldSetter = multiValuedFieldSetters.get(root.fieldName);
+      if (multiValuedFieldSetter == null) {
+        throw new MetaException(
+            "Cannot find multi-valued field setter for field " + root.fieldName);
+      }
+      multiValuedFieldSetter.setValue(root, partitions, sds, serdes, cds);
+    } else {
+      for (PartitionFieldNode child : root.children) {
+        traverseAndSetMultiValuedFields(child, partitions, sds, serdes, cds);
+      }
+    }
+  }
+
+  private void traverseAndSetValues(Partition part, PartitionFieldNode root, Object[] row,
+      PartitionFieldValueSetter valueSetter) throws MetaException {
+    // if root is null or is multiValued, do not recurse further
+    // multi-valued fields are set separately in setMultiValuedFields method
+    if (root == null || root.isMultiValued()) {
+      return;
+    }
+    if (root.isLeafNode()) {
+      valueSetter.setValue(part, root, row[root.fieldIndex]);
+      return;
+    }
+    for (PartitionFieldNode child : root.children) {
+      traverseAndSetValues(part, child, row, valueSetter);
+    }
+  }
+
+  private static final String SPACE = " ";
+
+  private int buildQueryForSingleValuedFields(List<Long> partitionIds, StringBuilder queryTextBuilder) {
+    queryTextBuilder.append("select ");
+    // build projection columns using the ProjectedFields
+    // it should not matter if you select all the
+    List<String> columnList = getSingleValuedColumnNames(roots);
+    queryTextBuilder.append(Joiner.on(',').join(columnList));
+    queryTextBuilder.append(SPACE);
+    queryTextBuilder.append("from " + PARTITIONS);
+    // if SD fields are selected add join clause with SDS
+    boolean foundSD = false;
+    if (find(SD_PATTERN)) {
+      queryTextBuilder.append(SPACE);
+      queryTextBuilder.append(
+          "left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\"");
+      foundSD = true;
+    }
+    // if serde fields are selected add join clause on serdes
+    if (foundSD || find(SERDE_PATTERN)) {
+      queryTextBuilder.append(SPACE);
+      queryTextBuilder.append(
+          "  left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES
+              + ".\"SERDE_ID\"");
+    }
+    queryTextBuilder.append(SPACE);
+    //add where clause
+    queryTextBuilder.append("where \"PART_ID\" in (" + Joiner.on(',').join(partitionIds)
+        + ") order by \"PART_NAME\" asc");
+    return columnList.size();
+  }
+
+  private int getSingleValuedColumnName(PartitionFieldNode root, int fieldId,
+      final List<String> columnNames) {
+    if (root == null) {
+      return fieldId;
+    }
+    if (root.isLeafNode() && !root.isMultiValued) {
+      if (fieldNameToTableName.containsKey(root.fieldName)) {
+        columnNames.add(fieldNameToTableName.get(root.fieldName));
+        root.setFieldIndex(fieldId++);
+        return fieldId;
+      }
+      throw new RuntimeException(
+          "No column name mapping found for partition field " + root.fieldName);
+    }
+    for (PartitionFieldNode child : root.children) {
+      fieldId = getSingleValuedColumnName(child, fieldId, columnNames);
+    }
+    return fieldId;
+  }
+
+  private List<String> getSingleValuedColumnNames(Set<PartitionFieldNode> roots) {
+    List<String> columnNames = new ArrayList<>();
+    int fieldIndex = 0;
+    for (PartitionFieldNode node : roots) {
+      fieldIndex = getSingleValuedColumnName(node, fieldIndex, columnNames);
+    }
+    return columnNames;
+  }
+
+
+  private static void getNestedFieldName(JsonNode jsonNode, String fieldName,
+      Collection<String> results) {
+    if (jsonNode instanceof ArrayNode) {
+      Iterator<JsonNode> elements = ((ArrayNode) jsonNode).elements();
+      if (!elements.hasNext()) {
+        results.add(fieldName);
+        return;
+      }
+      while (elements.hasNext()) {
+        JsonNode element = elements.next();
+        getNestedFieldName(element, fieldName, results);
+      }
+    } else {
+      Iterator<Entry<String, JsonNode>> fields = jsonNode.fields();
+      if (!fields.hasNext()) {
+        results.add(fieldName);
+        return;
+      }
+      while (fields.hasNext()) {
+        Entry<String, JsonNode> fieldKV = fields.next();
+        String key = fieldKV.getKey();
+        getNestedFieldName(fieldKV.getValue(), fieldName.length() == 0 ? key : fieldName + "." + key,
+            results);
+      }
+    }
+  }
+
+  static class PartitionFieldNode {
+    private String fieldName;
+    private Set<PartitionFieldNode> children = new HashSet<>(4);
+    private boolean isMultiValued;
+    private int fieldIndex;
+
+    PartitionFieldNode(String fieldName) {
+      this.fieldName = fieldName;
+      isMultiValued = false;
+    }
+
+    PartitionFieldNode(String fieldName, boolean isMultiValued) {
+      this.fieldName = fieldName;
+      this.isMultiValued = isMultiValued;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o)
+        return true;
+      if (o == null || getClass() != o.getClass())
+        return false;
+      PartitionFieldNode that = (PartitionFieldNode) o;
+      return Objects.equals(fieldName, that.fieldName);
+    }
+
+    boolean isLeafNode() {
+      return children == null || children.isEmpty();
+    }
+
+    void setFieldIndex(int fieldIndex) {
+      this.fieldIndex = fieldIndex;
+    }
+
+    @VisibleForTesting
+    void addChild(PartitionFieldNode child) {
+      children.add(child);
+    }
+
+    @VisibleForTesting
+    String getFieldName() {
+      return fieldName;
+    }
+
+    @VisibleForTesting
+    Set<PartitionFieldNode> getChildren() {
+      return new HashSet<>(children);
+    }
+
+    @VisibleForTesting
+    boolean isMultiValued() {
+      return isMultiValued;
+    }
+
+    @Override
+    public String toString() {
+      return fieldName;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(fieldName);
+    }
+
+    void setMultiValued() {
+      this.isMultiValued = true;
+    }
+  }
+
+  private interface MutivaluedFieldSetter {
+    void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds) throws MetaException;
+  }
+
+  private class PartitionValuesSetter implements MutivaluedFieldSetter {
+    private PartitionValuesSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      final String tableName =
+          fieldNameToTableName.containsKey("PARTITION_KEY_VALS") ? fieldNameToTableName
+              .get("PARTITION_KEY_VALS") : "PARTITION_KEY_VALS";
+      MetastoreDirectSqlUtils
+          .setPartitionValues(tableName, pm, Joiner.on(',').join(partitions.keySet()), partitions);
+    }
+  }
+
+  private class PartitionParametersSetter implements MutivaluedFieldSetter {
+    private PartitionParametersSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      MetastoreDirectSqlUtils
+          .setPartitionParametersWithFilter(PARTITION_PARAMS, convertMapNullsToEmptyStrings, pm,
+              Joiner.on(',').join(partitions.keySet()), partitions, includeParamKeyPattern,
+              excludeParamKeyPattern);
+    }
+  }
+
+  private class PartitionSDColsSetter implements MutivaluedFieldSetter {
+    private PartitionSDColsSetter() {
+      // prevent instantiation
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      // find the fields which are requested for sd.cols
+      // children field names would be sd.cols.name, sd.cols.type or sd.cols.description
+      List<String> childFields = getChildrenFieldNames(root);
+      final String tableName = fieldNameToTableName.containsKey("COLUMNS_V2") ? fieldNameToTableName
+          .get("COLUMNS_V2") : "COLUMNS_V2";
+      MetastoreDirectSqlUtils
+          .setSDCols(tableName, childFields, pm, cds, Joiner.on(',').join(cds.keySet()));
+    }
+  }
+
+  private class PartitionSDBucketColsSetter implements MutivaluedFieldSetter {
+    private PartitionSDBucketColsSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      final String tableName =
+          fieldNameToTableName.containsKey("BUCKETING_COLS") ? fieldNameToTableName
+              .get("BUCKETING_COLS") : "BUCKETING_COLS";
+      MetastoreDirectSqlUtils
+          .setSDBucketCols(tableName, pm, sds, Joiner.on(',').join(sds.keySet()));
+    }
+  }
+
+  private class PartitionSortColsSetter implements MutivaluedFieldSetter {
+    private PartitionSortColsSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      List<String> childFieldNames = getChildrenFieldNames(root);
+      final String tableName = fieldNameToTableName.containsKey("SORT_COLS") ? fieldNameToTableName
+          .get("SORT_COLS") : "SORT_COLS";
+      MetastoreDirectSqlUtils
+          .setSDSortCols(tableName, childFieldNames, pm, sds, Joiner.on(',').join(sds.keySet()));
+    }
+  }
+
+  private List<String> getChildrenFieldNames(PartitionFieldNode root) throws MetaException {
+    List<String> childFields = new ArrayList<>(3);
+    for (PartitionFieldNode child : root.getChildren()) {
+      if (child.getFieldName().lastIndexOf(".") < 0) {
+        throw new MetaException("Error parsing multi-valued field name " + child.getFieldName());
+      }
+      childFields.add(child.getFieldName().substring(child.getFieldName().lastIndexOf(".") + 1));
+    }
+    return childFields;
+  }
+
+  private class PartitionSDParametersSetter implements MutivaluedFieldSetter {
+    private PartitionSDParametersSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      final String tableName = fieldNameToTableName.containsKey("SD_PARAMS") ? fieldNameToTableName
+          .get("SD_PARAMS") : "SD_PARAMS";
+      MetastoreDirectSqlUtils.setSDParameters(tableName, convertMapNullsToEmptyStrings, pm, sds,
+          Joiner.on(',').join(sds.keySet()));
+    }
+  }
+
+  private class PartitionSerdeInfoParametersSetter implements MutivaluedFieldSetter {
+    private PartitionSerdeInfoParametersSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      final String tableName =
+          fieldNameToTableName.containsKey("SERDE_PARAMS") ? fieldNameToTableName
+              .get("SERDE_PARAMS") : "SERDE_PARAMS";
+      MetastoreDirectSqlUtils.setSerdeParams(tableName, convertMapNullsToEmptyStrings, pm, serdes,
+          Joiner.on(',').join(serdes.keySet()));
+    }
+  }
+
+  private class PartitionSkewedColsNamesSetter implements MutivaluedFieldSetter {
+    private PartitionSkewedColsNamesSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      final String tableName =
+          fieldNameToTableName.containsKey("SKEWED_COL_NAMES") ? fieldNameToTableName
+              .get("SKEWED_COL_NAMES") : "SKEWED_COL_NAMES";
+      MetastoreDirectSqlUtils
+          .setSkewedColNames(tableName, pm, sds, Joiner.on(',').join(sds.keySet()));
+    }
+  }
+
+  private class PartitionSkewedColsValuesSetter implements MutivaluedFieldSetter {
+    private PartitionSkewedColsValuesSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      final String skewedStringListVals =
+          fieldNameToTableName.containsKey("SKEWED_STRING_LIST_VALUES") ? fieldNameToTableName
+              .get("SKEWED_STRING_LIST_VALUES") : "SKEWED_STRING_LIST_VALUES";
+      final String skewedVals =
+          fieldNameToTableName.containsKey("SKEWED_VALUES") ? fieldNameToTableName
+              .get("SKEWED_VALUES") : "SKEWED_VALUES";
+      MetastoreDirectSqlUtils.setSkewedColValues(skewedStringListVals, skewedVals, pm, sds,
+          Joiner.on(',').join(sds.keySet()));
+    }
+  }
+
+  private class PartitionSkewedColValLocationMapSetter implements MutivaluedFieldSetter {
+    private PartitionSkewedColValLocationMapSetter() {
+      //
+    }
+
+    @Override
+    public void setValue(PartitionFieldNode root, TreeMap<Long, Partition> partitions, TreeMap<Long, StorageDescriptor> sds,
+        TreeMap<Long, SerDeInfo> serdes, TreeMap<Long, List<FieldSchema>> cds)
+        throws MetaException {
+      final String skewedStringListVals =
+          fieldNameToTableName.containsKey("SKEWED_STRING_LIST_VALUES") ? fieldNameToTableName
+              .get("SKEWED_STRING_LIST_VALUES") : "SKEWED_STRING_LIST_VALUES";
+      final String skewedColValLocMap =
+          fieldNameToTableName.containsKey("SKEWED_COL_VALUE_LOC_MAP") ? fieldNameToTableName
+              .get("SKEWED_COL_VALUE_LOC_MAP") : "SKEWED_COL_VALUE_LOC_MAP";
+      MetastoreDirectSqlUtils
+          .setSkewedColLocationMaps(skewedColValLocMap, skewedStringListVals, pm, sds,
+              Joiner.on(',').join(sds.keySet()));
+    }
+  }
+
+  /**
+   * Given a list of partition fields, checks if all the fields requested are single-valued. If all
+   * the fields are single-valued returns list of equivalent MPartition fieldnames
+   * which can be used in the setResult clause of a JDO query
+   *
+   * @param partitionFields List of partitionFields in the projection
+   * @return List of JDO field names which can be used in setResult clause
+   * of a JDO query. Returns null if input partitionFields cannot be used in a setResult clause
+   */
+  public static List<String> getMPartitionFieldNames(List<String> partitionFields)
+      throws MetaException {
+    // if there are no partitionFields requested, it means all the fields are requested which include
+    // multi-valued fields.
+    if (partitionFields == null || partitionFields.isEmpty()) {
+      return null;
+    }
+    // throw exception if there are invalid field names
+    PartitionProjectionEvaluator.validate(partitionFields);
+    // else, check if all the fields are single-valued. In case there are multi-valued fields requested
+    // return null since setResult in JDO doesn't support multi-valued fields
+    if (!allPartitionSingleValuedFields.keySet().containsAll(partitionFields)) {
+      return null;
+    }
+    List<String> jdoFields = new ArrayList<>(partitionFields.size());
+    for (String partitionField : partitionFields) {
+      jdoFields.add(allPartitionSingleValuedFields.get(partitionField));
+    }
+    return jdoFields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index b61ee81..a6d9583 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -20,6 +20,12 @@ package org.apache.hadoop.hive.metastore;
 
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.ISchemaName;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
@@ -546,6 +552,38 @@ public interface RawStore extends Configurable {
       throws MetaException, NoSuchObjectException;
 
   /**
+   * Generic Partition request API, providing different kinds of filtering and controlling output.
+   *
+   * @param catName catalog name
+   * @param dbName database name
+   * @param tblName table name
+   * @param fieldList a dot separated strings which represent the fields which must be returned.
+   *                  Any other field which is not in the fieldList may be unset in the returned
+   *                  partitions (it is up to the implementation to decide whether it chooses to
+   *                 include or exclude such fields). E.g. setting the field list to <em>sd.location</em>,
+   *                 <em>serdeInfo.name</em>, <em>sd.cols.name</em>, <em>sd.cols.type</em> will
+   *                 return partitions which will have location field set in the storage descriptor.
+   *                 Also the serdeInf in the returned storage descriptor will only have name field
+   *                 set. This applies to multi-valued fields as well like sd.cols, so in the
+   *                 example above only name and type fields will be set for <em>sd.cols</em>.
+   *                 If the <em>fieldList</em> is empty or not present, all the fields will be set.
+   * @param includeParamKeyPattern SQL-92 compliant regex pattern for param keys to be included
+   *                               _ or % wildcards are supported. '_' represent one character and
+   *                               '%' represents 0 or more characters.
+   * @param excludeParamKeyPattern SQL-92 compliant regex pattern for param keys to be excluded
+   *                               _ or % wildcards are supported. '_' represent one character and
+   *                               '%' represents 0 or more characters
+   * @return List of matching partitions which which may be partially filled according to fieldList.
+   * @throws MetaException in case of errors
+   * @throws NoSuchObjectException when catalog or database or table isn't found
+   */
+  List<Partition> getPartitionSpecsByFilterAndProjection(String catName, String dbName, String tblName,
+                                                         final List<String> fieldList,
+                                                         final String includeParamKeyPattern,
+                                                         final String excludeParamKeyPattern)
+      throws MetaException, NoSuchObjectException;
+
+  /**
    * Get partitions using an already parsed expression.
    * @param catName catalog name.
    * @param dbName database name

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
index 7a0b21b..89f0db8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.metastore;
 
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
@@ -501,74 +500,74 @@ public class StatObjectConverter {
     colType = colType.toLowerCase();
     if (colType.equals("boolean")) {
       BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
-      boolStats.setNumFalses(MetaStoreDirectSql.extractSqlLong(falses));
-      boolStats.setNumTrues(MetaStoreDirectSql.extractSqlLong(trues));
-      boolStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      boolStats.setNumFalses(MetastoreDirectSqlUtils.extractSqlLong(falses));
+      boolStats.setNumTrues(MetastoreDirectSqlUtils.extractSqlLong(trues));
+      boolStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       data.setBooleanStats(boolStats);
     } else if (colType.equals("string") ||
         colType.startsWith("varchar") || colType.startsWith("char")) {
       StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector();
-      stringStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
-      stringStats.setAvgColLen(MetaStoreDirectSql.extractSqlDouble(avglen));
-      stringStats.setMaxColLen(MetaStoreDirectSql.extractSqlLong(maxlen));
-      stringStats.setNumDVs(MetaStoreDirectSql.extractSqlLong(dist));
-      stringStats.setBitVectors(MetaStoreDirectSql.extractSqlBlob(bitVector));
+      stringStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
+      stringStats.setAvgColLen(MetastoreDirectSqlUtils.extractSqlDouble(avglen));
+      stringStats.setMaxColLen(MetastoreDirectSqlUtils.extractSqlLong(maxlen));
+      stringStats.setNumDVs(MetastoreDirectSqlUtils.extractSqlLong(dist));
+      stringStats.setBitVectors(MetastoreDirectSqlUtils.extractSqlBlob(bitVector));
       data.setStringStats(stringStats);
     } else if (colType.equals("binary")) {
       BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
-      binaryStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
-      binaryStats.setAvgColLen(MetaStoreDirectSql.extractSqlDouble(avglen));
-      binaryStats.setMaxColLen(MetaStoreDirectSql.extractSqlLong(maxlen));
+      binaryStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
+      binaryStats.setAvgColLen(MetastoreDirectSqlUtils.extractSqlDouble(avglen));
+      binaryStats.setMaxColLen(MetastoreDirectSqlUtils.extractSqlLong(maxlen));
       data.setBinaryStats(binaryStats);
     } else if (colType.equals("bigint") || colType.equals("int") ||
         colType.equals("smallint") || colType.equals("tinyint") ||
         colType.equals("timestamp")) {
       LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
-      longStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      longStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       if (lhigh != null) {
-        longStats.setHighValue(MetaStoreDirectSql.extractSqlLong(lhigh));
+        longStats.setHighValue(MetastoreDirectSqlUtils.extractSqlLong(lhigh));
       }
       if (llow != null) {
-        longStats.setLowValue(MetaStoreDirectSql.extractSqlLong(llow));
+        longStats.setLowValue(MetastoreDirectSqlUtils.extractSqlLong(llow));
       }
-      longStats.setNumDVs(MetaStoreDirectSql.extractSqlLong(dist));
-      longStats.setBitVectors(MetaStoreDirectSql.extractSqlBlob(bitVector));
+      longStats.setNumDVs(MetastoreDirectSqlUtils.extractSqlLong(dist));
+      longStats.setBitVectors(MetastoreDirectSqlUtils.extractSqlBlob(bitVector));
       data.setLongStats(longStats);
     } else if (colType.equals("double") || colType.equals("float")) {
       DoubleColumnStatsDataInspector doubleStats = new DoubleColumnStatsDataInspector();
-      doubleStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      doubleStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       if (dhigh != null) {
-        doubleStats.setHighValue(MetaStoreDirectSql.extractSqlDouble(dhigh));
+        doubleStats.setHighValue(MetastoreDirectSqlUtils.extractSqlDouble(dhigh));
       }
       if (dlow != null) {
-        doubleStats.setLowValue(MetaStoreDirectSql.extractSqlDouble(dlow));
+        doubleStats.setLowValue(MetastoreDirectSqlUtils.extractSqlDouble(dlow));
       }
-      doubleStats.setNumDVs(MetaStoreDirectSql.extractSqlLong(dist));
-      doubleStats.setBitVectors(MetaStoreDirectSql.extractSqlBlob(bitVector));
+      doubleStats.setNumDVs(MetastoreDirectSqlUtils.extractSqlLong(dist));
+      doubleStats.setBitVectors(MetastoreDirectSqlUtils.extractSqlBlob(bitVector));
       data.setDoubleStats(doubleStats);
     } else if (colType.startsWith("decimal")) {
       DecimalColumnStatsDataInspector decimalStats = new DecimalColumnStatsDataInspector();
-      decimalStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      decimalStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       if (dechigh != null) {
         decimalStats.setHighValue(DecimalUtils.createThriftDecimal((String)dechigh));
       }
       if (declow != null) {
         decimalStats.setLowValue(DecimalUtils.createThriftDecimal((String)declow));
       }
-      decimalStats.setNumDVs(MetaStoreDirectSql.extractSqlLong(dist));
-      decimalStats.setBitVectors(MetaStoreDirectSql.extractSqlBlob(bitVector));
+      decimalStats.setNumDVs(MetastoreDirectSqlUtils.extractSqlLong(dist));
+      decimalStats.setBitVectors(MetastoreDirectSqlUtils.extractSqlBlob(bitVector));
       data.setDecimalStats(decimalStats);
     } else if (colType.equals("date")) {
       DateColumnStatsDataInspector dateStats = new DateColumnStatsDataInspector();
-      dateStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      dateStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       if (lhigh != null) {
-        dateStats.setHighValue(new Date(MetaStoreDirectSql.extractSqlLong(lhigh)));
+        dateStats.setHighValue(new Date(MetastoreDirectSqlUtils.extractSqlLong(lhigh)));
       }
       if (llow != null) {
-        dateStats.setLowValue(new Date(MetaStoreDirectSql.extractSqlLong(llow)));
+        dateStats.setLowValue(new Date(MetastoreDirectSqlUtils.extractSqlLong(llow)));
       }
-      dateStats.setNumDVs(MetaStoreDirectSql.extractSqlLong(dist));
-      dateStats.setBitVectors(MetaStoreDirectSql.extractSqlBlob(bitVector));
+      dateStats.setNumDVs(MetastoreDirectSqlUtils.extractSqlLong(dist));
+      dateStats.setBitVectors(MetastoreDirectSqlUtils.extractSqlBlob(bitVector));
       data.setDateStats(dateStats);
     }
   }
@@ -582,49 +581,49 @@ public class StatObjectConverter {
     colType = colType.toLowerCase();
     if (colType.equals("boolean")) {
       BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
-      boolStats.setNumFalses(MetaStoreDirectSql.extractSqlLong(falses));
-      boolStats.setNumTrues(MetaStoreDirectSql.extractSqlLong(trues));
-      boolStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      boolStats.setNumFalses(MetastoreDirectSqlUtils.extractSqlLong(falses));
+      boolStats.setNumTrues(MetastoreDirectSqlUtils.extractSqlLong(trues));
+      boolStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       data.setBooleanStats(boolStats);
     } else if (colType.equals("string") || colType.startsWith("varchar")
         || colType.startsWith("char")) {
       StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector();
-      stringStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
-      stringStats.setAvgColLen(MetaStoreDirectSql.extractSqlDouble(avglen));
-      stringStats.setMaxColLen(MetaStoreDirectSql.extractSqlLong(maxlen));
-      stringStats.setNumDVs(MetaStoreDirectSql.extractSqlLong(dist));
+      stringStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
+      stringStats.setAvgColLen(MetastoreDirectSqlUtils.extractSqlDouble(avglen));
+      stringStats.setMaxColLen(MetastoreDirectSqlUtils.extractSqlLong(maxlen));
+      stringStats.setNumDVs(MetastoreDirectSqlUtils.extractSqlLong(dist));
       data.setStringStats(stringStats);
     } else if (colType.equals("binary")) {
       BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
-      binaryStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
-      binaryStats.setAvgColLen(MetaStoreDirectSql.extractSqlDouble(avglen));
-      binaryStats.setMaxColLen(MetaStoreDirectSql.extractSqlLong(maxlen));
+      binaryStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
+      binaryStats.setAvgColLen(MetastoreDirectSqlUtils.extractSqlDouble(avglen));
+      binaryStats.setMaxColLen(MetastoreDirectSqlUtils.extractSqlLong(maxlen));
       data.setBinaryStats(binaryStats);
     } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint")
         || colType.equals("tinyint") || colType.equals("timestamp")) {
       LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
-      longStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      longStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       if (lhigh != null) {
-        longStats.setHighValue(MetaStoreDirectSql.extractSqlLong(lhigh));
+        longStats.setHighValue(MetastoreDirectSqlUtils.extractSqlLong(lhigh));
       }
       if (llow != null) {
-        longStats.setLowValue(MetaStoreDirectSql.extractSqlLong(llow));
+        longStats.setLowValue(MetastoreDirectSqlUtils.extractSqlLong(llow));
       }
-      long lowerBound = MetaStoreDirectSql.extractSqlLong(dist);
-      long higherBound = MetaStoreDirectSql.extractSqlLong(sumDist);
+      long lowerBound = MetastoreDirectSqlUtils.extractSqlLong(dist);
+      long higherBound = MetastoreDirectSqlUtils.extractSqlLong(sumDist);
       long rangeBound = Long.MAX_VALUE;
       if (lhigh != null && llow != null) {
-        rangeBound = MetaStoreDirectSql.extractSqlLong(lhigh)
-            - MetaStoreDirectSql.extractSqlLong(llow) + 1;
+        rangeBound = MetastoreDirectSqlUtils.extractSqlLong(lhigh)
+            - MetastoreDirectSqlUtils.extractSqlLong(llow) + 1;
       }
       long estimation;
       if (useDensityFunctionForNDVEstimation && lhigh != null && llow != null && avgLong != null
-          && MetaStoreDirectSql.extractSqlDouble(avgLong) != 0.0) {
+          && MetastoreDirectSqlUtils.extractSqlDouble(avgLong) != 0.0) {
         // We have estimation, lowerbound and higherbound. We use estimation if
         // it is between lowerbound and higherbound.
-        estimation = MetaStoreDirectSql
-            .extractSqlLong((MetaStoreDirectSql.extractSqlLong(lhigh) - MetaStoreDirectSql
-                .extractSqlLong(llow)) / MetaStoreDirectSql.extractSqlDouble(avgLong));
+        estimation = MetastoreDirectSqlUtils
+            .extractSqlLong((MetastoreDirectSqlUtils.extractSqlLong(lhigh) - MetastoreDirectSqlUtils
+                .extractSqlLong(llow)) / MetastoreDirectSqlUtils.extractSqlDouble(avgLong));
         if (estimation < lowerBound) {
           estimation = lowerBound;
         } else if (estimation > higherBound) {
@@ -638,28 +637,28 @@ public class StatObjectConverter {
       data.setLongStats(longStats);
     } else if (colType.equals("date")) {
       DateColumnStatsDataInspector dateStats = new DateColumnStatsDataInspector();
-      dateStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      dateStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       if (lhigh != null) {
-        dateStats.setHighValue(new Date(MetaStoreDirectSql.extractSqlLong(lhigh)));
+        dateStats.setHighValue(new Date(MetastoreDirectSqlUtils.extractSqlLong(lhigh)));
       }
       if (llow != null) {
-        dateStats.setLowValue(new Date(MetaStoreDirectSql.extractSqlLong(llow)));
+        dateStats.setLowValue(new Date(MetastoreDirectSqlUtils.extractSqlLong(llow)));
       }
-      long lowerBound = MetaStoreDirectSql.extractSqlLong(dist);
-      long higherBound = MetaStoreDirectSql.extractSqlLong(sumDist);
+      long lowerBound = MetastoreDirectSqlUtils.extractSqlLong(dist);
+      long higherBound = MetastoreDirectSqlUtils.extractSqlLong(sumDist);
       long rangeBound = Long.MAX_VALUE;
       if (lhigh != null && llow != null) {
-        rangeBound = MetaStoreDirectSql.extractSqlLong(lhigh)
-            - MetaStoreDirectSql.extractSqlLong(llow) + 1;
+        rangeBound = MetastoreDirectSqlUtils.extractSqlLong(lhigh)
+            - MetastoreDirectSqlUtils.extractSqlLong(llow) + 1;
       }
       long estimation;
       if (useDensityFunctionForNDVEstimation && lhigh != null && llow != null && avgLong != null
-          && MetaStoreDirectSql.extractSqlDouble(avgLong) != 0.0) {
+          && MetastoreDirectSqlUtils.extractSqlDouble(avgLong) != 0.0) {
         // We have estimation, lowerbound and higherbound. We use estimation if
         // it is between lowerbound and higherbound.
-        estimation = MetaStoreDirectSql
-            .extractSqlLong((MetaStoreDirectSql.extractSqlLong(lhigh) - MetaStoreDirectSql
-                .extractSqlLong(llow)) / MetaStoreDirectSql.extractSqlDouble(avgLong));
+        estimation = MetastoreDirectSqlUtils
+            .extractSqlLong((MetastoreDirectSqlUtils.extractSqlLong(lhigh) - MetastoreDirectSqlUtils
+                .extractSqlLong(llow)) / MetastoreDirectSqlUtils.extractSqlDouble(avgLong));
         if (estimation < lowerBound) {
           estimation = lowerBound;
         } else if (estimation > higherBound) {
@@ -673,20 +672,20 @@ public class StatObjectConverter {
       data.setDateStats(dateStats);
     } else if (colType.equals("double") || colType.equals("float")) {
       DoubleColumnStatsDataInspector doubleStats = new DoubleColumnStatsDataInspector();
-      doubleStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      doubleStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       if (dhigh != null) {
-        doubleStats.setHighValue(MetaStoreDirectSql.extractSqlDouble(dhigh));
+        doubleStats.setHighValue(MetastoreDirectSqlUtils.extractSqlDouble(dhigh));
       }
       if (dlow != null) {
-        doubleStats.setLowValue(MetaStoreDirectSql.extractSqlDouble(dlow));
+        doubleStats.setLowValue(MetastoreDirectSqlUtils.extractSqlDouble(dlow));
       }
-      long lowerBound = MetaStoreDirectSql.extractSqlLong(dist);
-      long higherBound = MetaStoreDirectSql.extractSqlLong(sumDist);
+      long lowerBound = MetastoreDirectSqlUtils.extractSqlLong(dist);
+      long higherBound = MetastoreDirectSqlUtils.extractSqlLong(sumDist);
       if (useDensityFunctionForNDVEstimation && dhigh != null && dlow != null && avgDouble != null
-          && MetaStoreDirectSql.extractSqlDouble(avgDouble) != 0.0) {
-        long estimation = MetaStoreDirectSql
-            .extractSqlLong((MetaStoreDirectSql.extractSqlLong(dhigh) - MetaStoreDirectSql
-                .extractSqlLong(dlow)) / MetaStoreDirectSql.extractSqlDouble(avgDouble));
+          && MetastoreDirectSqlUtils.extractSqlDouble(avgDouble) != 0.0) {
+        long estimation = MetastoreDirectSqlUtils
+            .extractSqlLong((MetastoreDirectSqlUtils.extractSqlLong(dhigh) - MetastoreDirectSqlUtils
+                .extractSqlLong(dlow)) / MetastoreDirectSqlUtils.extractSqlDouble(avgDouble));
         if (estimation < lowerBound) {
           doubleStats.setNumDVs(lowerBound);
         } else if (estimation > higherBound) {
@@ -700,7 +699,7 @@ public class StatObjectConverter {
       data.setDoubleStats(doubleStats);
     } else if (colType.startsWith("decimal")) {
       DecimalColumnStatsDataInspector decimalStats = new DecimalColumnStatsDataInspector();
-      decimalStats.setNumNulls(MetaStoreDirectSql.extractSqlLong(nulls));
+      decimalStats.setNumNulls(MetastoreDirectSqlUtils.extractSqlLong(nulls));
       Decimal low = null;
       Decimal high = null;
       BigDecimal blow = null;
@@ -722,12 +721,12 @@ public class StatObjectConverter {
         low = DecimalUtils.createThriftDecimal((String) declow);
       }
       decimalStats.setLowValue(low);
-      long lowerBound = MetaStoreDirectSql.extractSqlLong(dist);
-      long higherBound = MetaStoreDirectSql.extractSqlLong(sumDist);
+      long lowerBound = MetastoreDirectSqlUtils.extractSqlLong(dist);
+      long higherBound = MetastoreDirectSqlUtils.extractSqlLong(sumDist);
       if (useDensityFunctionForNDVEstimation && dechigh != null && declow != null && avgDecimal != null
-          && MetaStoreDirectSql.extractSqlDouble(avgDecimal) != 0.0) {
-        long estimation = MetaStoreDirectSql.extractSqlLong(MetaStoreDirectSql.extractSqlLong(bhigh
-            .subtract(blow).floatValue() / MetaStoreDirectSql.extractSqlDouble(avgDecimal)));
+          && MetastoreDirectSqlUtils.extractSqlDouble(avgDecimal) != 0.0) {
+        long estimation = MetastoreDirectSqlUtils.extractSqlLong(MetastoreDirectSqlUtils.extractSqlLong(bhigh
+            .subtract(blow).floatValue() / MetastoreDirectSqlUtils.extractSqlDouble(avgDecimal)));
         if (estimation < lowerBound) {
           decimalStats.setNumDVs(lowerBound);
         } else if (estimation > higherBound) {

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index b9a5458..70490f0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -1271,6 +1271,19 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
+  /**
+   * getPartitionSpecsByFilterAndProjection interface is currently non-cacheable.
+   */
+  public List<Partition> getPartitionSpecsByFilterAndProjection(String catName, String dbName,
+                                                                String tblName,
+                                                                List<String> fieldList,
+                                                                String includeParamKeyPattern,
+      String excludeParamKeysPattern) throws MetaException, NoSuchObjectException {
+    return rawStore.getPartitionSpecsByFilterAndProjection(catName, dbName, tblName, fieldList,
+        includeParamKeyPattern, excludeParamKeysPattern);
+  }
+
+  @Override
   public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr,
       String defaultPartitionName, short maxParts, List<Partition> result) throws TException {
     catName = StringUtils.normalizeIdentifier(catName);

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MSerDeInfo.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MSerDeInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MSerDeInfo.java
index 68f07e2..b8895df 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MSerDeInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MSerDeInfo.java
@@ -29,6 +29,9 @@ public class MSerDeInfo {
   private String deserializerClass;
   private int serdeType;
 
+  public MSerDeInfo() {
+    //default constructor used for deserialization
+  }
   /**
    *
    * @param name

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java
index 4c6ce00..304860b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java
@@ -98,7 +98,7 @@ public class MStorageDescriptor {
   /**
    * @param isCompressed the isCompressed to set
    */
-  public void setCompressed(boolean isCompressed) {
+  public void setIsCompressed(boolean isCompressed) {
     this.isCompressed = isCompressed;
   }
 
@@ -274,4 +274,11 @@ public class MStorageDescriptor {
     this.isStoredAsSubDirectories = storedAsSubDirectories;
   }
 
+  public MColumnDescriptor getCd() {
+    return cd;
+  }
+
+  public void setCd(MColumnDescriptor cd) {
+    this.cd = cd;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
index 10ff9df..f3b3866 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.utils;
 
+import java.beans.PropertyDescriptor;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.math.BigDecimal;
@@ -32,6 +33,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -43,10 +45,14 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.beanutils.PropertyUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.ListUtils;
 import org.apache.commons.lang.StringUtils;
@@ -56,7 +62,6 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.metastore.HiveMetaStore;
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -68,6 +73,10 @@ import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD;
+import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -108,8 +117,11 @@ public class MetaStoreServerUtils {
       return org.apache.commons.lang.StringUtils.defaultString(string);
     }
   };
+
   private static final String DELEGATION_TOKEN_STORE_CLS = "hive.cluster.delegation.token.store.class";
 
+  private static final char DOT = '.';
+
   /**
    * We have a need to sanity-check the map before conversion from persisted objects to
    * metadata thrift objects because null values in maps will cause a NPE if we send
@@ -977,6 +989,168 @@ public class MetaStoreServerUtils {
     }
   }
 
+  /**
+   * Coalesce list of partitions belonging to a table into a more compact PartitionSpec
+   * representation.
+   *
+   * @param table Table thrift object
+   * @param partitions List of partition objects
+   * @return collection PartitionSpec objects which is a compressed representation of original
+   * partition list.
+   */
+  public static List<PartitionSpec> getPartitionspecsGroupedByStorageDescriptor(Table table,
+                                                                                Collection<Partition> partitions) {
+    final String tablePath = table.getSd().getLocation();
+
+    ImmutableListMultimap<StorageDescriptorKey, Partition> partitionsWithinTableDirectory =
+        Multimaps.index(partitions, input -> {
+          // if sd is not in the list of projected fields, all the partitions
+          // can be just grouped in PartitionSpec object
+          if (input.getSd() == null) {
+            return StorageDescriptorKey.UNSET_KEY;
+          }
+          // if the partition is within table, use the tableSDKey to group it with other partitions
+          // within the table directory
+          if (input.getSd().getLocation() != null && input.getSd().getLocation()
+              .startsWith(tablePath)) {
+            return new StorageDescriptorKey(tablePath, input.getSd());
+          }
+          // if partitions are located outside table location we treat them as non-standard
+          // and do not perform any grouping
+          // if the location is not set partitions are grouped according to the rest of the SD fields
+          return new StorageDescriptorKey(input.getSd());
+        });
+
+    List<PartitionSpec> partSpecs = new ArrayList<>();
+
+    // Classify partitions based on shared SD properties.
+    Map<StorageDescriptorKey, List<PartitionWithoutSD>> sdToPartList
+        = new HashMap<>();
+    // we don't expect partitions to exist outside directory in most cases
+    List<Partition> partitionsOutsideTableDir = new ArrayList<>(0);
+    for (StorageDescriptorKey key : partitionsWithinTableDirectory.keySet()) {
+      boolean isUnsetKey = key.equals(StorageDescriptorKey.UNSET_KEY);
+      // group the partitions together when
+      // case I : sd is not set because it was not in the requested fields
+      // case II : when sd.location is not set because it was not in the requested fields
+      // case III : when sd.location is set and it is located within table directory
+      if (isUnsetKey || key.baseLocation == null || key.baseLocation.equals(tablePath)) {
+        for (Partition partition : partitionsWithinTableDirectory.get(key)) {
+
+          PartitionWithoutSD partitionWithoutSD
+              = new PartitionWithoutSD();
+          partitionWithoutSD.setValues(partition.getValues());
+          partitionWithoutSD.setCreateTime(partition.getCreateTime());
+          partitionWithoutSD.setLastAccessTime(partition.getLastAccessTime());
+          partitionWithoutSD.setRelativePath(
+              (isUnsetKey || !partition.getSd().isSetLocation()) ? null : partition.getSd()
+                  .getLocation().substring(tablePath.length()));
+          partitionWithoutSD.setParameters(partition.getParameters());
+
+          if (!sdToPartList.containsKey(key)) {
+            sdToPartList.put(key, new ArrayList<>());
+          }
+          sdToPartList.get(key).add(partitionWithoutSD);
+        }
+      } else {
+        // Lump all partitions outside the tablePath into one PartSpec.
+        // if non-standard partitions need not be deDuped create PartitionListComposingSpec
+        // this will be used mostly for keeping backwards compatibility with  some HMS APIs which use
+        // PartitionListComposingSpec for non-standard partitions located outside table
+        partitionsOutsideTableDir.addAll(partitionsWithinTableDirectory.get(key));
+      }
+    }
+    // create sharedSDPartSpec for all the groupings
+    for (Map.Entry<StorageDescriptorKey, List<PartitionWithoutSD>> entry : sdToPartList
+        .entrySet()) {
+      partSpecs.add(getSharedSDPartSpec(table, entry.getKey(), entry.getValue()));
+    }
+    if (!partitionsOutsideTableDir.isEmpty()) {
+      PartitionSpec partListSpec = new PartitionSpec();
+      partListSpec.setDbName(table.getDbName());
+      partListSpec.setTableName(table.getTableName());
+      partListSpec.setPartitionList(new PartitionListComposingSpec(partitionsOutsideTableDir));
+      partSpecs.add(partListSpec);
+    }
+    return partSpecs;
+  }
+
+  /**
+   * Convert list of partitions to a PartitionSpec object.
+   */
+  private static PartitionSpec getSharedSDPartSpec(Table table, StorageDescriptorKey sdKey, List<PartitionWithoutSD> partitions) {
+    StorageDescriptor sd;
+    if (sdKey.getSd() == null) {
+      //sd is not requested set it empty StorageDescriptor in the PartitionSpec
+      sd = new StorageDescriptor();
+    } else {
+      sd = new StorageDescriptor(sdKey.getSd());
+      sd.setLocation(sdKey.baseLocation); // Use table-dir as root-dir.
+    }
+    PartitionSpecWithSharedSD sharedSDPartSpec =
+        new PartitionSpecWithSharedSD();
+    sharedSDPartSpec.setPartitions(partitions);
+    sharedSDPartSpec.setSd(sd);
+
+    PartitionSpec ret = new PartitionSpec();
+    ret.setRootPath(sd.getLocation());
+    ret.setSharedSDPartitionSpec(sharedSDPartSpec);
+    ret.setDbName(table.getDbName());
+    ret.setTableName(table.getTableName());
+
+    return ret;
+  }
+
+  /**
+   * This is a util method to set a nested property of a given object. The nested property is a
+   * dot separated string where each nesting level is separated by a dot. This method makes use of
+   * PropertyUtils methods from apache-commons library and assumes that the field names provided in
+   * the input propertyName have valid setters. eg. the propertyName sd.serdeInfo.inputFormat represents
+   * the inputformat field of the serdeInfo field of the sd field. The argument bean should have these
+   * fields (in this case it should be a Partition object). The value argument is the value to be set
+   * for the nested field. Note that if in case of one of nested levels is null you must set
+   * instantiateMissingFields argument to true otherwise this method could throw a NPE.
+   *
+   * @param bean the object whose nested field needs to be set. This object must have setter methods
+   *             defined for each nested field name in the propertyName
+   * @param propertyName the nested propertyName to be set. Each level of nesting is dot separated
+   * @param value the value to which the nested property is set
+   * @param instantiateMissingFields in case of some nestedFields being nulls, setting this argument
+   *                                 to true will attempt to instantiate the missing fields using the
+   *                                 default constructor. If there is no default constructor available this would throw a MetaException
+   * @throws MetaException
+   */
+  public static void setNestedProperty(Object bean, String propertyName, Object value,
+      boolean instantiateMissingFields) throws MetaException {
+    try {
+      String[] nestedFields = propertyName.split("\\.");
+      //check if there are more than one nested levels
+      if (nestedFields.length > 1 && instantiateMissingFields) {
+        StringBuilder fieldNameBuilder = new StringBuilder();
+        //check if all the nested levels until the given fieldName is set
+        for (int level = 0; level < nestedFields.length - 1; level++) {
+          fieldNameBuilder.append(nestedFields[level]);
+          String currentFieldName = fieldNameBuilder.toString();
+          Object fieldVal = PropertyUtils.getProperty(bean, currentFieldName);
+          if (fieldVal == null) {
+            //one of the nested levels is null. Instantiate it
+            PropertyDescriptor fieldDescriptor =
+                PropertyUtils.getPropertyDescriptor(bean, currentFieldName);
+            //this assumes the MPartition and the nested field objects have a default constructor
+            Object defaultInstance = fieldDescriptor.getPropertyType().newInstance();
+            PropertyUtils.setNestedProperty(bean, currentFieldName, defaultInstance);
+          }
+          //add dot separator for the next level of nesting
+          fieldNameBuilder.append(DOT);
+        }
+      }
+      PropertyUtils.setNestedProperty(bean, propertyName, value);
+    } catch (Exception e) {
+      throw new MetaException(
+          org.apache.hadoop.hive.metastore.utils.StringUtils.stringifyException(e));
+    }
+  }
+
   // ColumnStatisticsObj with info about its db, table, partition (if table is partitioned)
   public static class ColStatsObjWithSourceInfo {
     private final ColumnStatisticsObj colStatsObj;
@@ -1014,4 +1188,110 @@ public class MetaStoreServerUtils {
       return partName;
     }
   }
+
+  /**
+   * This class is used to group the partitions based on a shared storage descriptor.
+   * The following fields are considered for hashing/equality:
+   * <ul>
+   *   <li>location</li>
+   *   <li>serializationLib</li>
+   *   <li>inputFormat</li>
+   *   <li>outputFormat</li>
+   *   <li>columns</li>
+   * </ul>
+   *
+   * For objects that share these can share the same storage descriptor,
+   * significantly reducing on-the-wire cost.
+   *
+   * Check {@link #getPartitionspecsGroupedByStorageDescriptor} for more details
+   */
+  @VisibleForTesting
+  static class StorageDescriptorKey {
+    private final StorageDescriptor sd;
+    private final String baseLocation;
+    private final int hashCode;
+
+    @VisibleForTesting
+    static final StorageDescriptorKey UNSET_KEY = new StorageDescriptorKey();
+
+    StorageDescriptorKey(StorageDescriptor sd) {
+      this(sd.getLocation(), sd);
+    }
+
+    StorageDescriptorKey(String baseLocation, StorageDescriptor sd) {
+      this.sd = sd;
+      this.baseLocation = baseLocation;
+      if (sd == null) {
+        hashCode = Objects.hashCode(baseLocation);
+      } else {
+        // use the baseLocation provided instead of sd.getLocation()
+        hashCode = Objects.hash(sd.getSerdeInfo() == null ? null :
+                sd.getSerdeInfo().getSerializationLib(),
+            sd.getInputFormat(), sd.getOutputFormat(), baseLocation, sd.getCols());
+      }
+    }
+
+    // Set everything to null
+    StorageDescriptorKey() {
+      baseLocation = null;
+      sd = null;
+      hashCode = 0;
+    }
+
+    StorageDescriptor getSd() {
+      return sd;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o)
+        return true;
+      if (o == null || getClass() != o.getClass())
+        return false;
+      StorageDescriptorKey that = (StorageDescriptorKey) o;
+
+      if (!Objects.equals(baseLocation, that.baseLocation)) {
+        return false;
+      }
+
+      if (sd == null && that.sd == null) {
+        return true;
+      }
+
+      if (sd == null || that.sd == null) {
+        return false;
+      }
+
+      if (!Objects.equals(sd.getOutputFormat(), that.sd.getOutputFormat())) {
+        return false;
+      }
+      if (!Objects.equals(sd.getCols(), that.sd.getCols())) {
+        return false;
+      }
+      if (!Objects.equals(sd.getInputFormat(), that.sd.getInputFormat())) {
+        return false;
+      }
+
+      if (!Objects.equals(sd.getSerdeInfo(), that.sd.getSerdeInfo())) {
+        return false;
+      }
+      if (sd.getSerdeInfo() != null && that.sd.getSerdeInfo() == null) {
+        return false;
+      }
+      if (sd.getSerdeInfo() == null && that.sd.getSerdeInfo() != null) {
+        return false;
+      }
+      if (sd.getSerdeInfo() != null && that.sd.getSerdeInfo() != null &&
+          !Objects.equals(sd.getSerdeInfo().getSerializationLib(),
+              that.sd.getSerdeInfo().getSerializationLib())) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return hashCode;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 0934aeb..4dd4edc 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -18,13 +18,15 @@
 
 package org.apache.hadoop.hive.metastore;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
 import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 import org.apache.hadoop.hive.metastore.api.ISchema;
+import org.apache.hadoop.hive.metastore.api.ISchemaName;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -49,11 +52,6 @@ import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse;
 import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
-import org.apache.hadoop.hive.metastore.api.WMNullablePool;
-import org.apache.hadoop.hive.metastore.api.WMNullableResourcePlan;
-import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-import org.apache.hadoop.hive.metastore.api.WMTrigger;
-import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
 import org.apache.hadoop.hive.metastore.api.RuntimeStat;
@@ -64,6 +62,7 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.SchemaVersion;
+import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
@@ -71,11 +70,18 @@ import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMMapping;
+import org.apache.hadoop.hive.metastore.api.WMNullablePool;
+import org.apache.hadoop.hive.metastore.api.WMNullableResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMPool;
-import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
-import org.apache.hadoop.hive.metastore.api.ISchemaName;
-import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
+import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
+import org.apache.thrift.TException;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -83,13 +89,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
-import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
-import org.apache.thrift.TException;
-
 /**
  * A wrapper around {@link org.apache.hadoop.hive.metastore.ObjectStore}
  * with the ability to control the result of commitTransaction().
@@ -377,6 +376,14 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
   }
 
   @Override
+  public List<Partition> getPartitionSpecsByFilterAndProjection(String catName, String dbName, String tblName,
+                                                                List<String> fieldList, String paramKeys, String excludeFlag)
+      throws MetaException, NoSuchObjectException {
+    return objectStore.getPartitionSpecsByFilterAndProjection(catName,
+        dbName, tblName, fieldList, paramKeys, excludeFlag);
+  }
+
+  @Override
   public int getNumPartitionsByFilter(String catName, String dbName, String tblName,
                                       String filter) throws MetaException, NoSuchObjectException {
     return objectStore.getNumPartitionsByFilter(catName, dbName, tblName, filter);

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 70a17f5..06f4cbc 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -380,6 +380,13 @@ public class DummyRawStoreForJdoConnection implements RawStore {
   }
 
   @Override
+  public List<Partition> getPartitionSpecsByFilterAndProjection(String catName, String dbName, String tblName,
+      List<String> fieldList, String paramKeys, String excludeFlag)
+      throws MetaException, NoSuchObjectException {
+    return Collections.emptyList();
+  }
+
+  @Override
   public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName,
       List<String> partNames) throws MetaException, NoSuchObjectException {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/44ef91a6/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index ce590d0..2861b6b 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -3537,4 +3537,10 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos
       throws TException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public GetPartitionsResponse getPartitionsWithSpecs(GetPartitionsRequest request)
+      throws TException {
+    throw new UnsupportedOperationException();
+  }
 }