You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:00 UTC

[13/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index c8b0d4c..3670de1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore;
 import static org.apache.commons.lang.StringUtils.join;
 import static org.apache.commons.lang.StringUtils.repeat;
 
+import java.sql.Clob;
 import java.sql.Connection;
 import java.sql.Statement;
 import java.sql.SQLException;
@@ -60,6 +61,8 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.cache.CacheUtils;
+import org.apache.hadoop.hive.metastore.cache.CachedStore;
 import org.apache.hadoop.hive.metastore.model.MConstraint;
 import org.apache.hadoop.hive.metastore.model.MDatabase;
 import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
@@ -78,6 +81,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * This class contains the optimizations for MetaStore that rely on direct SQL access to
@@ -648,7 +652,7 @@ class MetaStoreDirectSql {
     loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
       @Override
       public void apply(StorageDescriptor t, Object[] fields) {
-        t.putToParameters((String)fields[1], (String)fields[2]);
+        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
       }});
     // Perform conversion of null map values
     for (StorageDescriptor t : sds.values()) {
@@ -779,7 +783,7 @@ class MetaStoreDirectSql {
       loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
         @Override
         public void apply(List<FieldSchema> t, Object[] fields) {
-          t.add(new FieldSchema((String)fields[2], (String)fields[3], (String)fields[1]));
+          t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1]));
         }});
     }
 
@@ -790,7 +794,7 @@ class MetaStoreDirectSql {
     loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
       @Override
       public void apply(SerDeInfo t, Object[] fields) {
-        t.putToParameters((String)fields[1], (String)fields[2]);
+        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
       }});
     // Perform conversion of null map values
     for (SerDeInfo t : serdes.values()) {
@@ -878,6 +882,21 @@ class MetaStoreDirectSql {
     return ((Number) obj).doubleValue();
   }
 
+  private String extractSqlClob(Object value) {
+    if (value == null) return null;
+    try {
+      if (value instanceof Clob) {
+        // we trim the Clob value to a max length an int can hold
+        int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2;
+        return ((Clob)value).getSubString(1L, maxLength);
+      } else {
+        return value.toString();
+      }
+    } catch (SQLException sqle) {
+      return null;
+    }
+  }
+
   private static String trimCommaList(StringBuilder sb) {
     if (sb.length() > 0) {
       sb.setLength(sb.length() - 1);
@@ -1190,7 +1209,7 @@ class MetaStoreDirectSql {
   }
 
   public AggrStats aggrColStatsForPartitions(String dbName, String tableName,
-      List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation)
+      List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation, double  ndvTuner)
       throws MetaException {
     if (colNames.isEmpty() || partNames.isEmpty()) {
       LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval");
@@ -1225,7 +1244,7 @@ class MetaStoreDirectSql {
           // Read aggregated stats for one column
           colStatsAggrFromDB =
               columnStatisticsObjForPartitions(dbName, tableName, partNames, colNamesForDB,
-                  partsFound, useDensityFunctionForNDVEstimation);
+                  partsFound, useDensityFunctionForNDVEstimation, ndvTuner);
           if (!colStatsAggrFromDB.isEmpty()) {
             ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0);
             colStatsList.add(colStatsAggr);
@@ -1238,7 +1257,7 @@ class MetaStoreDirectSql {
       partsFound = partsFoundForPartitions(dbName, tableName, partNames, colNames);
       colStatsList =
           columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, partsFound,
-              useDensityFunctionForNDVEstimation);
+              useDensityFunctionForNDVEstimation, ndvTuner);
     }
     LOG.info("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation
         + "\npartsFound = " + partsFound + "\nColumnStatisticsObj = "
@@ -1301,24 +1320,81 @@ class MetaStoreDirectSql {
 
   private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(final String dbName,
     final String tableName, final List<String> partNames, List<String> colNames, long partsFound,
-    final boolean useDensityFunctionForNDVEstimation) throws MetaException {
+    final boolean useDensityFunctionForNDVEstimation, final double  ndvTuner) throws MetaException {
     final boolean areAllPartsFound = (partsFound == partNames.size());
     return runBatched(colNames, new Batchable<String, ColumnStatisticsObj>() {
       public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException {
         return runBatched(partNames, new Batchable<String, ColumnStatisticsObj>() {
           public List<ColumnStatisticsObj> run(List<String> inputPartNames) throws MetaException {
             return columnStatisticsObjForPartitionsBatch(dbName, tableName, inputPartNames,
-                inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation);
+                inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
           }
         });
       }
     });
   }
 
+  // Get aggregated column stats for a table per partition for all columns in the partition
+  // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm)
+  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+      String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
+    String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
+        // The following data is used to compute a partitioned table's NDV based
+        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
+        // accurately derived from partition NDVs, because the domain of column value two partitions
+        // can overlap. If there is no overlap then global NDV is just the sum
+        // of partition NDVs (UpperBound). But if there is some overlay then
+        // global NDV can be anywhere between sum of partition NDVs (no overlap)
+        // and same as one of the partition NDV (domain of column value in all other
+        // partitions is subset of the domain value in one of the partition)
+        // (LowerBound).But under uniform distribution, we can roughly estimate the global
+        // NDV by leveraging the min/max values.
+        // And, we also guarantee that the estimation makes sense by comparing it to the
+        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
+        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+        + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+        + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\""
+        + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+    long start = 0;
+    long end = 0;
+    Query query = null;
+    boolean doTrace = LOG.isDebugEnabled();
+    Object qResult = null;
+    ForwardQueryResult fqr = null;
+    start = doTrace ? System.nanoTime() : 0;
+    query = pm.newQuery("javax.jdo.query.SQL", queryText);
+    qResult = executeWithArray(query,
+        prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText);
+    if (qResult == null) {
+      query.closeAll();
+      return Maps.newHashMap();
+    }
+    end = doTrace ? System.nanoTime() : 0;
+    timingTrace(doTrace, queryText, start, end);
+    List<Object[]> list = ensureList(qResult);
+    Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>();
+    for (Object[] row : list) {
+      String partName = (String) row[0];
+      String colName = (String) row[1];
+      partColStatsMap.put(
+          CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName),
+          prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner));
+      Deadline.checkTimeout();
+    }
+    query.closeAll();
+    return partColStatsMap;
+  }
+
   /** Should be called with the list short enough to not trip up Oracle/etc. */
   private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String dbName,
       String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound,
-      boolean useDensityFunctionForNDVEstimation) throws MetaException {
+      boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
     // TODO: all the extrapolation logic should be moved out of this class,
     // only mechanical data retrieval should remain here.
     String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
@@ -1370,7 +1446,7 @@ class MetaStoreDirectSql {
       List<Object[]> list = ensureList(qResult);
       List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(list.size());
       for (Object[] row : list) {
-        colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation));
+        colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
         Deadline.checkTimeout();
       }
       query.closeAll();
@@ -1429,7 +1505,7 @@ class MetaStoreDirectSql {
         }
         list = ensureList(qResult);
         for (Object[] row : list) {
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation));
+          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
           Deadline.checkTimeout();
         }
         end = doTrace ? System.nanoTime() : 0;
@@ -1576,7 +1652,7 @@ class MetaStoreDirectSql {
               query.closeAll();
             }
           }
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation));
+          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
           Deadline.checkTimeout();
         }
       }
@@ -1596,13 +1672,13 @@ class MetaStoreDirectSql {
   }
 
   private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i,
-      boolean useDensityFunctionForNDVEstimation) throws MetaException {
+      boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
     ColumnStatisticsData data = new ColumnStatisticsData();
     ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++], (String) row[i++], data);
     Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble = row[i++], avgDecimal = row[i++], sumDist = row[i++];
     StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh,
         declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble,
-        avgDecimal, sumDist, useDensityFunctionForNDVEstimation);
+        avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner);
     return cso;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
index b0defb5..868e5a5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
@@ -75,19 +75,17 @@ public abstract class MetaStoreEventListener implements Configurable {
   }
 
   /**
-   * @param add partition event
-   * @throws MetaException
-   */
-
-  /**
    * @param tableEvent alter table event
    * @throws MetaException
    */
   public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
   }
 
-  public void onAddPartition (AddPartitionEvent partitionEvent)
-      throws MetaException {
+  /**
+   * @param partitionEvent add partition event
+   * @throws MetaException
+   */
+  public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
new file mode 100644
index 0000000..20011cc
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+
+/**
+ * This class is used to notify a list of listeners about specific MetaStore events.
+ */
+@Private
+public class MetaStoreListenerNotifier {
+  private interface EventNotifier {
+    void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException;
+  }
+
+  private static Map<EventType, EventNotifier> notificationEvents = Maps.newHashMap(
+      ImmutableMap.<EventType, EventNotifier>builder()
+          .put(EventType.CREATE_DATABASE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onCreateDatabase((CreateDatabaseEvent)event);
+            }
+          })
+          .put(EventType.DROP_DATABASE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropDatabase((DropDatabaseEvent)event);
+            }
+          })
+          .put(EventType.CREATE_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onCreateTable((CreateTableEvent)event);
+            }
+          })
+          .put(EventType.DROP_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropTable((DropTableEvent)event);
+            }
+          })
+          .put(EventType.ADD_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAddPartition((AddPartitionEvent)event);
+            }
+          })
+          .put(EventType.DROP_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropPartition((DropPartitionEvent)event);
+            }
+          })
+          .put(EventType.ALTER_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAlterTable((AlterTableEvent)event);
+            }
+          })
+          .put(EventType.ALTER_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAlterPartition((AlterPartitionEvent)event);
+            }
+          })
+          .put(EventType.INSERT, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onInsert((InsertEvent)event);
+            }
+          })
+          .put(EventType.CREATE_FUNCTION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onCreateFunction((CreateFunctionEvent)event);
+            }
+          })
+          .put(EventType.DROP_FUNCTION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropFunction((DropFunctionEvent)event);
+            }
+          })
+          .put(EventType.CREATE_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAddIndex((AddIndexEvent)event);
+            }
+          })
+          .put(EventType.DROP_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropIndex((DropIndexEvent)event);
+            }
+          })
+          .put(EventType.ALTER_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAlterIndex((AlterIndexEvent)event);
+            }
+          })
+          .build()
+  );
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+   *         map if no parameters were updated or if no listeners were notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+                                                EventType eventType,
+                                                ListenerEvent event) throws MetaException {
+
+    Preconditions.checkNotNull(listeners, "Listeners must not be null.");
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    for (MetaStoreEventListener listener : listeners) {
+      notificationEvents.get(eventType).notify(listener, event);
+    }
+
+    // Each listener called above might set a different parameter on the event.
+    // This write permission is allowed on the listener side to avoid breaking compatibility if we change the API
+    // method calls.
+    return event.getParameters();
+  }
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client.
+   * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+   *         map if no parameters were updated or if no listeners were notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+                                                EventType eventType,
+                                                ListenerEvent event,
+                                                EnvironmentContext environmentContext) throws MetaException {
+
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    event.setEnvironmentContext(environmentContext);
+    return notifyEvent(listeners, eventType, event);
+  }
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client.
+   * @param parameters A list of key/value pairs with the new parameters to add.
+   * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+   *         map if no parameters were updated or if no listeners were notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+                                                EventType eventType,
+                                                ListenerEvent event,
+                                                EnvironmentContext environmentContext,
+                                                Map<String, String> parameters) throws MetaException {
+
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    event.putParameters(parameters);
+    return notifyEvent(listeners, eventType, event, environmentContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java
index 9c30ee7..320902b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.metastore;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
@@ -27,21 +26,19 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.common.util.HiveVersionInfo;
 
 import com.google.common.collect.ImmutableMap;
 
 
 public class MetaStoreSchemaInfo {
-  private static String SQL_FILE_EXTENSION=".sql";
-  private static String UPGRADE_FILE_PREFIX="upgrade-";
-  private static String INIT_FILE_PREFIX="hive-schema-";
-  private static String VERSION_UPGRADE_LIST = "upgrade.order";
-  private static String PRE_UPGRADE_PREFIX = "pre-";
+  private static final String SQL_FILE_EXTENSION = ".sql";
+  private static final String UPGRADE_FILE_PREFIX = "upgrade-";
+  private static final String INIT_FILE_PREFIX = "hive-schema-";
+  private static final String VERSION_UPGRADE_LIST = "upgrade.order";
+  private static final String PRE_UPGRADE_PREFIX = "pre-";
   private final String dbType;
   private final String hiveSchemaVersions[];
-  private final HiveConf hiveConf;
   private final String hiveHome;
 
   // Some version upgrades often don't change schema. So they are equivalent to
@@ -55,10 +52,9 @@ public class MetaStoreSchemaInfo {
           "1.2.1", "1.2.0"
       );
 
-  public MetaStoreSchemaInfo(String hiveHome, HiveConf hiveConf, String dbType) throws HiveMetaException {
+  public MetaStoreSchemaInfo(String hiveHome, String dbType) throws HiveMetaException {
     this.hiveHome = hiveHome;
     this.dbType = dbType;
-    this.hiveConf = hiveConf;
     // load upgrade order for the given dbType
     List<String> upgradeOrderList = new ArrayList<String>();
     String upgradeListFile = getMetaStoreScriptDir() + File.separator +

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 6259cda..3ee7977 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -47,6 +47,7 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -86,6 +87,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
 
@@ -638,14 +640,6 @@ public class MetaStoreUtils {
     }
   }
 
-  static boolean isCascadeNeededInAlterTable(Table oldTable, Table newTable) {
-    //currently cascade only supports add/replace columns and
-    //changing column type/position/name/comments
-    List<FieldSchema> oldCols = oldTable.getSd().getCols();
-    List<FieldSchema> newCols = newTable.getSd().getCols();
-    return !areSameColumns(oldCols, newCols);
-  }
-
   static boolean areSameColumns(List<FieldSchema> oldCols, List<FieldSchema> newCols) {
     if (oldCols.size() != newCols.size()) {
       return false;
@@ -696,8 +690,6 @@ public class MetaStoreUtils {
       TypeInfoUtils.getTypeInfoFromTypeString(newType));
   }
 
-  public static final int MAX_MS_TYPENAME_LENGTH = 2000; // 4000/2, for an unlikely unicode case
-
   public static final String TYPE_FROM_DESERIALIZER = "<derived from deserializer>";
   /**
    * validate column type
@@ -708,9 +700,6 @@ public class MetaStoreUtils {
    */
   static public String validateColumnType(String type) {
     if (type.equals(TYPE_FROM_DESERIALIZER)) return null;
-    if (type.length() > MAX_MS_TYPENAME_LENGTH) {
-      return "type name is too long: " + type;
-    }
     int last = 0;
     boolean lastAlphaDigit = isValidTypeChar(type.charAt(last));
     for (int i = 1; i <= type.length(); i++) {
@@ -1769,8 +1758,19 @@ public class MetaStoreUtils {
    * @param conf
    * @return The SASL configuration
    */
-  public static Map<String, String> getMetaStoreSaslProperties(HiveConf conf) {
+  public static Map<String, String> getMetaStoreSaslProperties(HiveConf conf, boolean useSSL) {
     // As of now Hive Meta Store uses the same configuration as Hadoop SASL configuration
+
+    // If SSL is enabled, override the given value of "hadoop.rpc.protection" and set it to "authentication"
+    // This disables any encryption provided by SASL, since SSL already provides it
+    String hadoopRpcProtectionVal = conf.get(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION);
+    String hadoopRpcProtectionAuth = SaslRpcServer.QualityOfProtection.AUTHENTICATION.toString();
+
+    if (useSSL && hadoopRpcProtectionVal != null && !hadoopRpcProtectionVal.equals(hadoopRpcProtectionAuth)) {
+      LOG.warn("Overriding value of " + CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION + " setting it from "
+              + hadoopRpcProtectionVal + " to " + hadoopRpcProtectionAuth + " because SSL is enabled");
+      conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, hadoopRpcProtectionAuth);
+    }
     return ShimLoader.getHadoopThriftAuthBridge().getHadoopSaslProperties(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 51bc6d0..c351ffd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -160,9 +160,13 @@ import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.thrift.TException;
 import org.datanucleus.AbstractNucleusContext;
 import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.ClassLoaderResolverImpl;
 import org.datanucleus.NucleusContext;
+import org.datanucleus.api.jdo.JDOPersistenceManager;
 import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
 import org.datanucleus.store.rdbms.exceptions.MissingTableException;
+import org.datanucleus.store.scostore.Store;
+import org.datanucleus.util.WeakValueMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -195,6 +199,7 @@ public class ObjectStore implements RawStore, Configurable {
   private static final Map<String, Class> PINCLASSMAP;
   private static final String HOSTNAME;
   private static final String USER;
+  private static final String JDO_PARAM = ":param";
   static {
     Map<String, Class> map = new HashMap<String, Class>();
     map.put("table", MTable.class);
@@ -234,26 +239,22 @@ public class ObjectStore implements RawStore, Configurable {
   private Pattern partitionValidationPattern;
 
   /**
-   * A class to pass the Query object to the caller to let the caller release
-   * resources by calling QueryWrapper.query.closeAll() after consuming all the query results.
+   * A Autocloseable wrapper around Query class to pass the Query object to the caller and let the caller release
+   * the resources when the QueryWrapper goes out of scope
    */
-  public static class QueryWrapper {
+  public static class QueryWrapper implements AutoCloseable {
     public Query query;
 
     /**
      * Explicitly closes the query object to release the resources
      */
+    @Override
     public void close() {
       if (query != null) {
         query.closeAll();
         query = null;
       }
     }
-
-    @Override
-    protected void finalize() {
-      this.close();
-    }
   }
 
   public ObjectStore() {
@@ -284,6 +285,9 @@ public class ObjectStore implements RawStore, Configurable {
       boolean propsChanged = !propsFromConf.equals(prop);
 
       if (propsChanged) {
+        if (pmf != null){
+          clearOutPmfClassLoaderCache(pmf);
+        }
         pmf = null;
         prop = null;
       }
@@ -748,12 +752,7 @@ public class ObjectStore implements RawStore, Configurable {
       pm.retrieve(mdb);
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     if (mdb == null) {
       throw new NoSuchObjectException("There is no database named " + name);
@@ -872,10 +871,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       success = commitTransaction();
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      queryWrapper.close();
+      rollbackAndCleanup(success, queryWrapper);
     }
     return success;
   }
@@ -893,33 +889,20 @@ public class ObjectStore implements RawStore, Configurable {
       // Take the pattern and split it on the | to get all the composing
       // patterns
       String[] subpatterns = pattern.trim().split("\\|");
-      String queryStr = "select name from org.apache.hadoop.hive.metastore.model.MDatabase where (";
-      boolean first = true;
-      for (String subpattern : subpatterns) {
-        subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
-        if (!first) {
-          queryStr = queryStr + " || ";
-        }
-        queryStr = queryStr + " name.matches(\"" + subpattern + "\")";
-        first = false;
-      }
-      queryStr = queryStr + ")";
-      query = pm.newQuery(queryStr);
+      StringBuilder filterBuilder = new StringBuilder();
+      List<String> parameterVals = new ArrayList<>(subpatterns.length);
+      appendPatternCondition(filterBuilder, "name", subpatterns, parameterVals);
+      query = pm.newQuery(MDatabase.class, filterBuilder.toString());
       query.setResult("name");
       query.setOrdering("name ascending");
-      Collection names = (Collection) query.execute();
+      Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
       databases = new ArrayList<String>();
       for (Iterator i = names.iterator(); i.hasNext();) {
         databases.add((String) i.next());
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return databases;
   }
@@ -939,12 +922,7 @@ public class ObjectStore implements RawStore, Configurable {
       databases = new ArrayList<String>((Collection<String>) query.execute());
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     Collections.sort(databases);
     return databases;
@@ -1012,12 +990,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return type;
   }
@@ -1041,12 +1014,7 @@ public class ObjectStore implements RawStore, Configurable {
       success = commitTransaction();
       LOG.debug("type not found " + typeName, e);
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return success;
   }
@@ -1206,6 +1174,9 @@ public class ObjectStore implements RawStore, Configurable {
 
   private List<MConstraint> listAllTableConstraintsWithOptionalConstraintName
     (String dbName, String tableName, String constraintname) {
+    dbName = HiveStringUtils.normalizeIdentifier(dbName);
+    tableName = HiveStringUtils.normalizeIdentifier(tableName);
+    constraintname = constraintname!=null?HiveStringUtils.normalizeIdentifier(constraintname):null;
     List<MConstraint> mConstraints = null;
     List<String> constraintNames = new ArrayList<String>();
     Query query = null;
@@ -1296,40 +1267,28 @@ public class ObjectStore implements RawStore, Configurable {
       dbName = HiveStringUtils.normalizeIdentifier(dbName);
       // Take the pattern and split it on the | to get all the composing
       // patterns
-      String[] subpatterns = pattern.trim().split("\\|");
-      String queryStr =
-          "select tableName from org.apache.hadoop.hive.metastore.model.MTable "
-              + "where database.name == dbName && (";
-      boolean first = true;
-      for (String subpattern : subpatterns) {
-        subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
-        if (!first) {
-          queryStr = queryStr + " || ";
-        }
-        queryStr = queryStr + " tableName.matches(\"" + subpattern + "\")";
-        first = false;
+      List<String> parameterVals = new ArrayList<>();
+      StringBuilder filterBuilder = new StringBuilder();
+      //adds database.name == dbName to the filter
+      appendSimpleCondition(filterBuilder, "database.name", new String[] {dbName}, parameterVals);
+      if(pattern != null) {
+        appendPatternCondition(filterBuilder, "tableName", pattern, parameterVals);
       }
-      queryStr = queryStr + ")";
-      if (tableType != null) {
-        queryStr = queryStr + " && tableType.matches(\"" + tableType.toString() + "\")";
+      if(tableType != null) {
+        appendPatternCondition(filterBuilder, "tableType", new String[] {tableType.toString()}, parameterVals);
       }
-      query = pm.newQuery(queryStr);
-      query.declareParameters("java.lang.String dbName");
+
+      query = pm.newQuery(MTable.class, filterBuilder.toString());
       query.setResult("tableName");
       query.setOrdering("tableName ascending");
-      Collection names = (Collection) query.execute(dbName);
+      Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
       tbls = new ArrayList<String>();
       for (Iterator i = names.iterator(); i.hasNext();) {
         tbls.add((String) i.next());
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return tbls;
   }
@@ -1361,12 +1320,7 @@ public class ObjectStore implements RawStore, Configurable {
       result = (Long) query.execute();
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return result.intValue();
   }
@@ -1382,19 +1336,20 @@ public class ObjectStore implements RawStore, Configurable {
       openTransaction();
       // Take the pattern and split it on the | to get all the composing
       // patterns
-      StringBuilder builder = new StringBuilder();
+      StringBuilder filterBuilder = new StringBuilder();
+      List<String> parameterVals = new ArrayList<>();
       if (dbNames != null && !dbNames.equals("*")) {
-        appendPatternCondition(builder, "database.name", dbNames);
+        appendPatternCondition(filterBuilder, "database.name", dbNames, parameterVals);
       }
       if (tableNames != null && !tableNames.equals("*")) {
-        appendPatternCondition(builder, "tableName", tableNames);
+        appendPatternCondition(filterBuilder, "tableName", tableNames, parameterVals);
       }
       if (tableTypes != null && !tableTypes.isEmpty()) {
-        appendSimpleCondition(builder, "tableType", tableTypes.toArray(new String[0]));
+        appendSimpleCondition(filterBuilder, "tableType", tableTypes.toArray(new String[0]), parameterVals);
       }
 
-      query = pm.newQuery(MTable.class, builder.toString());
-      Collection<MTable> tables = (Collection<MTable>) query.execute();
+      query = pm.newQuery(MTable.class, filterBuilder.toString());
+      Collection<MTable> tables = (Collection<MTable>) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
       for (MTable table : tables) {
         TableMeta metaData = new TableMeta(
             table.getDatabase().getName(), table.getTableName(), table.getTableType());
@@ -1403,29 +1358,29 @@ public class ObjectStore implements RawStore, Configurable {
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return metas;
   }
 
+  private StringBuilder appendPatternCondition(StringBuilder filterBuilder, String fieldName,
+      String[] elements, List<String> parameterVals) {
+    return appendCondition(filterBuilder, fieldName, elements, true, parameterVals);
+  }
+
   private StringBuilder appendPatternCondition(StringBuilder builder,
-      String fieldName, String elements) {
+      String fieldName, String elements, List<String> parameters) {
       elements = HiveStringUtils.normalizeIdentifier(elements);
-    return appendCondition(builder, fieldName, elements.split("\\|"), true);
+    return appendCondition(builder, fieldName, elements.split("\\|"), true, parameters);
   }
 
   private StringBuilder appendSimpleCondition(StringBuilder builder,
-      String fieldName, String[] elements) {
-    return appendCondition(builder, fieldName, elements, false);
+      String fieldName, String[] elements, List<String> parameters) {
+    return appendCondition(builder, fieldName, elements, false, parameters);
   }
 
   private StringBuilder appendCondition(StringBuilder builder,
-      String fieldName, String[] elements, boolean pattern) {
+      String fieldName, String[] elements, boolean pattern, List<String> parameters) {
     if (builder.length() > 0) {
       builder.append(" && ");
     }
@@ -1435,14 +1390,15 @@ public class ObjectStore implements RawStore, Configurable {
       if (pattern) {
         element = "(?i)" + element.replaceAll("\\*", ".*");
       }
+      parameters.add(element);
       if (builder.length() > length) {
         builder.append(" || ");
       }
       builder.append(fieldName);
       if (pattern) {
-        builder.append(".matches(\"").append(element).append("\")");
+        builder.append(".matches(").append(JDO_PARAM).append(parameters.size()).append(")");
       } else {
-        builder.append(" == \"").append(element).append("\"");
+        builder.append(" == ").append(JDO_PARAM).append(parameters.size());
       }
     }
     builder.append(" )");
@@ -1488,12 +1444,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     nmtbl.mtbl = mtbl;
     return nmtbl;
@@ -1536,15 +1487,10 @@ public class ObjectStore implements RawStore, Configurable {
       }
       committed = commitTransaction();
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
+      rollbackAndCleanup(committed, query);
       if (dbExistsQuery != null) {
         dbExistsQuery.closeAll();
       }
-      if (query != null) {
-        query.closeAll();
-      }
     }
     return tables;
   }
@@ -2065,12 +2011,7 @@ public class ObjectStore implements RawStore, Configurable {
         }
       }
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return ret;
   }
@@ -2302,10 +2243,7 @@ public class ObjectStore implements RawStore, Configurable {
       success =  commitTransaction();
       return parts;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      queryWrapper.close();
+      rollbackAndCleanup(success, queryWrapper);
     }
   }
 
@@ -2407,6 +2345,7 @@ public class ObjectStore implements RawStore, Configurable {
     for (Iterator i = names.iterator(); i.hasNext();) {
       pns.add((String) i.next());
     }
+
     if (query != null) {
       query.closeAll();
     }
@@ -2501,10 +2440,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       success = commitTransaction();
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      queryWrapper.close();
+      rollbackAndCleanup(success, queryWrapper);
     }
     return partitions;
   }
@@ -2526,10 +2462,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       success = commitTransaction();
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      queryWrapper.close();
+      rollbackAndCleanup(success, queryWrapper);
     }
     return partitionNames;
   }
@@ -3294,12 +3227,7 @@ public class ObjectStore implements RawStore, Configurable {
       success = commitTransaction();
       LOG.debug("Done retrieving all objects for listTableNamesByFilter");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return tableNames;
   }
@@ -3345,12 +3273,7 @@ public class ObjectStore implements RawStore, Configurable {
       success = commitTransaction();
       LOG.debug("Done retrieving all objects for listMPartitionNamesByFilter");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return partNames;
   }
@@ -3571,10 +3494,7 @@ public class ObjectStore implements RawStore, Configurable {
       success = commitTransaction();
       LOG.debug("successfully deleted a CD in removeUnusedColumnDescriptor");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      queryWrapper.close();
+      rollbackAndCleanup(success, queryWrapper);
     }
   }
 
@@ -3658,12 +3578,7 @@ public class ObjectStore implements RawStore, Configurable {
       constraintNameIfExists = (String) constraintExistsQuery.execute(name);
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (constraintExistsQuery != null) {
-          constraintExistsQuery.closeAll();
-      }
+      rollbackAndCleanup(commited, constraintExistsQuery);
     }
     return constraintNameIfExists != null && !constraintNameIfExists.isEmpty();
   }
@@ -3911,12 +3826,7 @@ public class ObjectStore implements RawStore, Configurable {
       pm.retrieve(midx);
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return midx;
   }
@@ -3979,12 +3889,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       return indexes;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -4011,12 +3916,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       success = commitTransaction();
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return pns;
   }
@@ -4139,12 +4039,7 @@ public class ObjectStore implements RawStore, Configurable {
       pm.retrieve(mRoleMember);
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return mRoleMember;
   }
@@ -4213,11 +4108,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       success = commitTransaction();
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-
-      queryWrapper.close();
+      rollbackAndCleanup(success, queryWrapper);
     }
     return success;
   }
@@ -4287,12 +4178,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listRoles");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
 
     if (principalType == PrincipalType.USER) {
@@ -4358,7 +4244,6 @@ public class ObjectStore implements RawStore, Configurable {
       mRoleMemebership = (List<MRoleMap>) query.execute(roleName, principalType.toString());
       pm.retrieveAll(mRoleMemebership);
       success = commitTransaction();
-
       LOG.debug("Done retrieving all objects for listMSecurityPrincipalMembershipRole");
     } finally {
       if (!success) {
@@ -4392,12 +4277,7 @@ public class ObjectStore implements RawStore, Configurable {
       pm.retrieve(mrole);
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return mrole;
   }
@@ -4419,12 +4299,7 @@ public class ObjectStore implements RawStore, Configurable {
       success = commitTransaction();
       return roleNames;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -5250,12 +5125,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listRoleMembers");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mRoleMemeberList;
   }
@@ -5306,12 +5176,7 @@ public class ObjectStore implements RawStore, Configurable {
         userNameDbPriv.addAll(mPrivs);
       }
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return userNameDbPriv;
   }
@@ -5351,12 +5216,7 @@ public class ObjectStore implements RawStore, Configurable {
       commited = commitTransaction();
       return convertGlobal(userNameDbPriv);
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
   }
 
@@ -5399,12 +5259,7 @@ public class ObjectStore implements RawStore, Configurable {
       mSecurityDBList.addAll(mPrivs);
       LOG.debug("Done retrieving all objects for listPrincipalDBGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mSecurityDBList;
   }
@@ -5527,12 +5382,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listAllTableGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mSecurityTabList;
   }
@@ -5559,12 +5409,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listTableAllPartitionGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mSecurityTabPartList;
   }
@@ -5592,12 +5437,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listTableAllColumnGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mTblColPrivilegeList;
   }
@@ -5626,12 +5466,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listTableAllPartitionColumnGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mSecurityColList;
   }
@@ -5674,7 +5509,6 @@ public class ObjectStore implements RawStore, Configurable {
   private List<MDBPrivilege> listDatabaseGrants(String dbName, QueryWrapper queryWrapper) {
     dbName = HiveStringUtils.normalizeIdentifier(dbName);
     boolean success = false;
-
     try {
       LOG.debug("Executing listDatabaseGrants");
 
@@ -5782,12 +5616,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listAllTableGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mSecurityTabPartList;
   }
@@ -5847,12 +5676,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listPrincipalPartitionGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mSecurityTabPartList;
   }
@@ -5916,12 +5740,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mSecurityColList;
   }
@@ -5983,12 +5802,7 @@ public class ObjectStore implements RawStore, Configurable {
 
       LOG.debug("Done retrieving all objects for listPrincipalPartitionColumnGrants");
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     return mSecurityColList;
   }
@@ -6050,12 +5864,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPrincipalPartitionColumnGrantsAll");
       return result;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6083,12 +5892,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPartitionColumnGrantsAll");
       return result;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6163,12 +5967,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPrincipalAllTableGrants");
       return result;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6191,12 +5990,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPrincipalAllTableGrants");
       return result;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6236,7 +6030,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPrincipalAllPartitionGrants");
     } finally {
       if (!success) {
-        rollbackTransaction();
+       rollbackTransaction();
       }
     }
     return mSecurityTabPartList;
@@ -6268,12 +6062,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPrincipalPartitionGrantsAll");
       return result;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6299,12 +6088,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPrincipalPartitionGrantsAll");
       return result;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6382,12 +6166,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrantsAll");
       return result;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6412,12 +6191,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrantsAll");
       return result;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6494,12 +6268,7 @@ public class ObjectStore implements RawStore, Configurable {
       LOG.debug("Done executing isPartitionMarkedForEvent");
       return (partEvents != null && !partEvents.isEmpty()) ? true : false;
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
   }
 
@@ -6553,7 +6322,6 @@ public class ObjectStore implements RawStore, Configurable {
   public Collection<?> executeJDOQLSelect(String queryStr, QueryWrapper queryWrapper) {
     boolean committed = false;
     Collection<?> result = null;
-
     try {
       openTransaction();
       Query query = queryWrapper.query = pm.newQuery(queryStr);
@@ -6594,12 +6362,7 @@ public class ObjectStore implements RawStore, Configurable {
         return -1;
       }
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -6629,12 +6392,7 @@ public class ObjectStore implements RawStore, Configurable {
         return null;
       }
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -6745,12 +6503,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       return retVal;
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -6838,12 +6591,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       return retVal;
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -6877,12 +6625,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       return retVal;
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -6973,12 +6716,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       return retVal;
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -7055,12 +6793,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       return retVal;
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -7271,7 +7004,6 @@ public class ObjectStore implements RawStore, Configurable {
     }
 
     boolean committed = false;
-
     try {
       openTransaction();
 
@@ -7318,7 +7050,7 @@ public class ObjectStore implements RawStore, Configurable {
     for (String colName : colNames) {
       boolean foundCol = false;
       for (FieldSchema mCol : colList) {
-        if (mCol.getName().equals(colName.trim())) {
+        if (mCol.getName().equals(colName)) {
           foundCol = true;
           break;
         }
@@ -7430,13 +7162,16 @@ public class ObjectStore implements RawStore, Configurable {
   @Override
   public AggrStats get_aggr_stats_for(String dbName, String tblName,
       final List<String> partNames, final List<String> colNames) throws MetaException, NoSuchObjectException {
-    final boolean  useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+    final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
+        HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+    final double ndvTuner = HiveConf.getFloatVar(getConf(),
+        HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
     return new GetHelper<AggrStats>(dbName, tblName, true, false) {
       @Override
       protected AggrStats getSqlResult(GetHelper<AggrStats> ctx)
           throws MetaException {
         return directSql.aggrColStatsForPartitions(dbName, tblName, partNames,
-            colNames, useDensityFunctionForNDVEstimation);
+            colNames, useDensityFunctionForNDVEstimation, ndvTuner);
       }
       @Override
       protected AggrStats getJdoResult(GetHelper<AggrStats> ctx)
@@ -7454,6 +7189,38 @@ public class ObjectStore implements RawStore, Configurable {
   }
 
   @Override
+  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+      String tableName) throws MetaException, NoSuchObjectException {
+    final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
+        HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+    final double ndvTuner = HiveConf.getFloatVar(getConf(),
+        HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
+    return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) {
+      @Override
+      protected Map<String, ColumnStatisticsObj> getSqlResult(
+          GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException {
+        return directSql.getAggrColStatsForTablePartitions(dbName, tblName,
+            useDensityFunctionForNDVEstimation, ndvTuner);
+      }
+
+      @Override
+      protected Map<String, ColumnStatisticsObj> getJdoResult(
+          GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException,
+          NoSuchObjectException {
+        // This is fast path for query optimizations, if we can find this info
+        // quickly using directSql, do it. No point in failing back to slow path
+        // here.
+        throw new MetaException("Jdo path is not implemented for stats aggr.");
+      }
+
+      @Override
+      protected String describeResult() {
+        return null;
+      }
+    }.run(true);
+  }
+
+  @Override
   public void flushCache() {
     // NOP as there's no caching
   }
@@ -7466,7 +7233,12 @@ public class ObjectStore implements RawStore, Configurable {
     try {
       openTransaction();
       // We are not going to verify SD for each partition. Just verify for the table.
-      validateTableCols(table, colNames);
+      // ToDo: we need verify the partition column instead
+      try {
+        validateTableCols(table, colNames);
+      } catch (MetaException me) {
+        LOG.warn("The table does not have the same column definition as its partition.");
+      }
       Query query = queryWrapper.query = pm.newQuery(MPartitionColumnStatistics.class);
       String paramStr = "java.lang.String t1, java.lang.String t2";
       String filter = "tableName == t1 && dbName == t2 && (";
@@ -7593,12 +7365,7 @@ public class ObjectStore implements RawStore, Configurable {
       rollbackTransaction();
       throw e;
     } finally {
-      if (!ret) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(ret, query);
     }
     return ret;
   }
@@ -7668,12 +7435,7 @@ public class ObjectStore implements RawStore, Configurable {
       rollbackTransaction();
       throw e;
     } finally {
-      if (!ret) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(ret, query);
     }
     return ret;
   }
@@ -7695,12 +7457,7 @@ public class ObjectStore implements RawStore, Configurable {
       delCnt = query.deletePersistentAll(curTime, expiryTime);
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
       LOG.debug("Done executing cleanupEvents");
     }
     return delCnt;
@@ -7804,12 +7561,7 @@ public class ObjectStore implements RawStore, Configurable {
       return tokenIdents;
     } finally {
       LOG.debug("Done executing getAllTokenIdentifers with status : " + committed);
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -7852,12 +7604,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       committed = commitTransaction();
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
     LOG.debug("Done executing updateMasterKey with status : " + committed);
     if (null == masterKey) {
@@ -7885,12 +7632,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       success = commitTransaction();
     } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(success, query);
     }
     LOG.debug("Done executing removeMasterKey with status : " + success);
     return (null != masterKey) && success;
@@ -7916,12 +7658,7 @@ public class ObjectStore implements RawStore, Configurable {
       return masterKeys;
     } finally {
       LOG.debug("Done executing getMasterKeys with status : " + committed);
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -8033,12 +7770,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       return mVerTables.get(0);
     } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(committed, query);
     }
   }
 
@@ -8264,12 +7996,7 @@ public class ObjectStore implements RawStore, Configurable {
       pm.retrieve(mfunc);
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return mfunc;
   }
@@ -8317,37 +8044,23 @@ public class ObjectStore implements RawStore, Configurable {
       dbName = HiveStringUtils.normalizeIdentifier(dbName);
       // Take the pattern and split it on the | to get all the composing
       // patterns
-      String[] subpatterns = pattern.trim().split("\\|");
-      String queryStr =
-          "select functionName from org.apache.hadoop.hive.metastore.model.MFunction "
-              + "where database.name == dbName && (";
-      boolean first = true;
-      for (String subpattern : subpatterns) {
-        subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
-        if (!first) {
-          queryStr = queryStr + " || ";
-        }
-        queryStr = queryStr + " functionName.matches(\"" + subpattern + "\")";
-        first = false;
+      List<String> parameterVals = new ArrayList<>();
+      StringBuilder filterBuilder = new StringBuilder();
+      appendSimpleCondition(filterBuilder, "database.name", new String[] { dbName }, parameterVals);
+      if(pattern != null) {
+        appendPatternCondition(filterBuilder, "functionName", pattern, parameterVals);
       }
-      queryStr = queryStr + ")";
-      query = pm.newQuery(queryStr);
-      query.declareParameters("java.lang.String dbName");
+      query = pm.newQuery(MFunction.class, filterBuilder.toString());
       query.setResult("functionName");
       query.setOrdering("functionName ascending");
-      Collection names = (Collection) query.execute(dbName);
+      Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
       funcs = new ArrayList<String>();
       for (Iterator i = names.iterator(); i.hasNext();) {
         funcs.add((String) i.next());
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return funcs;
   }
@@ -8356,6 +8069,9 @@ public class ObjectStore implements RawStore, Configurable {
   public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) {
     boolean commited = false;
     Query query = null;
+
+    NotificationEventResponse result = new NotificationEventResponse();
+    result.setEvents(new ArrayList<NotificationEvent>());
     try {
       openTransaction();
       long lastEvent = rqst.getLastEvent();
@@ -8365,11 +8081,9 @@ public class ObjectStore implements RawStore, Configurable {
       Collection<MNotificationLog> events = (Collection) query.execute(lastEvent);
       commited = commitTransaction();
       if (events == null) {
-        return null;
+        return result;
       }
       Iterator<MNotificationLog> i = events.iterator();
-      NotificationEventResponse result = new NotificationEventResponse();
-      result.setEvents(new ArrayList<NotificationEvent>());
       int maxEvents = rqst.getMaxEvents() > 0 ? rqst.getMaxEvents() : Integer.MAX_VALUE;
       int numEvents = 0;
       while (i.hasNext() && numEvents++ < maxEvents) {
@@ -8377,11 +8091,8 @@ public class ObjectStore implements RawStore, Configurable {
       }
       return result;
     } finally {
-      if (query != null) {
-        query.closeAll();
-      }
       if (!commited) {
-        rollbackTransaction();
+        rollbackAndCleanup(commited, query);
         return null;
       }
     }
@@ -8411,12 +8122,7 @@ public class ObjectStore implements RawStore, Configurable {
       pm.makePersistent(translateThriftToDb(entry));
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
   }
 
@@ -8436,12 +8142,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
   }
 
@@ -8460,12 +8161,7 @@ public class ObjectStore implements RawStore, Configurable {
       commited = commitTransaction();
       return new CurrentNotificationEventId(id);
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
   }
 
@@ -8533,20 +8229,99 @@ public class ObjectStore implements RawStore, Configurable {
    */
   public static void unCacheDataNucleusClassLoaders() {
     PersistenceManagerFactory pmf = ObjectStore.getPMF();
-    if ((pmf != null) && (pmf instanceof JDOPersistenceManagerFactory)) {
-      JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
-      NucleusContext nc = jdoPmf.getNucleusContext();
-      try {
-        Field classLoaderResolverMap = AbstractNucleusContext.class.getDeclaredField(
-            "classLoaderResolverMap");
-        classLoaderResolverMap.setAccessible(true);
-        classLoaderResolverMap.set(nc, new HashMap<String, ClassLoaderResolver>());
-        LOG.debug("Removed cached classloaders from DataNucleus NucleusContext");
-      } catch (Exception e) {
-        LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext ", e);
+    clearOutPmfClassLoaderCache(pmf);
+  }
+
+  private static void clearOutPmfClassLoaderCache(PersistenceManagerFactory pmf) {
+    if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) {
+      return;
+    }
+    // NOTE : This is hacky, and this section of code is fragile depending on DN code varnames
+    // so it's likely to stop working at some time in the future, especially if we upgrade DN
+    // versions, so we actively need to find a better way to make sure the leak doesn't happen
+    // instead of just clearing out the cache after every call.
+    JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
+    NucleusContext nc = jdoPmf.getNucleusContext();
+    try {
+      Field pmCache = pmf.getClass().getDeclaredField("pmCache");
+      pmCache.setAccessible(true);
+      Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>)pmCache.get(pmf);
+      for (JDOPersistenceManager pm : pmSet) {
+        org.datanucleus.ExecutionContext ec = (org.datanucleus.ExecutionContext)pm.getExecutionContext();
+        if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) {
+          ClassLoaderResolver clr = ((org.datanucleus.ExecutionContextThreadedImpl)ec).getClassLoaderResolver();
+          clearClr(clr);
+        }
+      }
+      org.datanucleus.plugin.PluginManager pluginManager = jdoPmf.getNucleusContext().getPluginManager();
+      Field registryField = pluginManager.getClass().getDeclaredField("registry");
+      registryField.setAccessible(true);
+      org.datanucleus.plugin.PluginRegistry registry = (org.datanucleus.plugin.PluginRegistry)registryField.get(pluginManager);
+      if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) {
+        org.datanucleus.plugin.NonManagedPluginRegistry nRegistry = (org.datanucleus.plugin.NonManagedPluginRegistry)registry;
+        Field clrField = nRegistry.getClass().getDeclaredField("clr");
+        clrField.setAccessible(true);
+        ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(nRegistry);
+        clearClr(clr);
+      }
+      if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) {
+        org.datanucleus.PersistenceNucleusContextImpl pnc = (org.datanucleus.PersistenceNucleusContextImpl)nc;
+        org.datanucleus.store.types.TypeManagerImpl tm = (org.datanucleus.store.types.TypeManagerImpl)pnc.getTypeManager();
+        Field clrField = tm.getClass().getDeclaredField("clr");
+        clrField.setAccessible(true);
+        ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(tm);
+        clearClr(clr);
+        Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr");
+        storeMgrField.setAccessible(true);
+        org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr = (org.datanucleus.store.rdbms.RDBMSStoreManager)storeMgrField.get(pnc);
+        Field backingStoreField = storeMgr.getClass().getDeclaredField("backingStoreByMemberName");
+        backingStoreField.setAccessible(true);
+        Map<String, Store> backingStoreByMemberName = (Map<String, Store>)backingStoreField.get(storeMgr);
+        for (Store store : backingStoreByMemberName.values()) {
+          org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore = (org.datanucleus.store.rdbms.scostore.BaseContainerStore)store;
+          clrField = org.datanucleus.store.rdbms.scostore.BaseContainerStore.class.getDeclaredField("clr");
+          clrField.setAccessible(true);
+          clr = (ClassLoaderResolver)clrField.get(baseStore);
+          clearClr(clr);
+        }
+      }
+      Field classLoaderResolverMap = AbstractNucleusContext.class.getDeclaredField(
+          "classLoaderResolverMap");
+      classLoaderResolverMap.setAccessible(true);
+      Map<String,ClassLoaderResolver> loaderMap =
+          (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc);
+      for (ClassLoaderResolver clr : loaderMap.values()){
+        clearClr(clr);
+      }
+      classLoaderResolverMap.set(nc, new HashMap<String, ClassLoaderResolver>());
+      LOG.debug("Removed cached classloaders from DataNucleus NucleusContext");
+    } catch (Exception e) {
+      LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext ", e);
+    }
+  }
+
+  private static void clearClr(ClassLoaderResolver clr) throws Exception {
+    if (clr != null){
+      if (clr instanceof ClassLoaderResolverImpl){
+        ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr;
+        long resourcesCleared = clearFieldMap(clri,"resources");
+        long loadedClassesCleared = clearFieldMap(clri,"loadedClasses");
+        long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses");
+        LOG.debug("Cleared ClassLoaderResolverImpl: " +
+            resourcesCleared + "," + loadedClassesCleared + "," + unloadedClassesCleared);
       }
     }
   }
+  private static long clearFieldMap(ClassLoaderResolverImpl clri, String mapFieldName) throws Exception {
+    Field mapField = ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName);
+    mapField.setAccessible(true);
+
+    Map<String,Class> map = (Map<String, Class>) mapField.get(clri);
+    long sz = map.size();
+    mapField.set(clri, Collections.synchronizedMap(new WeakValueMap()));
+    return sz;
+  }
+
 
   @Override
   public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name) throws MetaException {
@@ -8557,10 +8332,12 @@ public class ObjectStore implements RawStore, Configurable {
     }
   }
 
-  protected List<SQLPrimaryKey> getPrimaryKeysInternal(final String db_name,
-    final String tbl_name,
+  protected List<SQLPrimaryKey> getPrimaryKeysInternal(final String db_name_input,
+    final String tbl_name_input,
     boolean allowSql, boolean allowJdo)
   throws MetaException, NoSuchObjectException {
+    final String db_name = HiveStringUtils.normalizeIdentifier(db_name_input);
+    final String tbl_name = HiveStringUtils.normalizeIdentifier(tbl_name_input);
     return new GetListHelper<SQLPrimaryKey>(db_name, tbl_name, allowSql, allowJdo) {
 
       @Override
@@ -8603,12 +8380,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return primaryKeys;
   }
@@ -8633,12 +8405,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       commited = commitTransaction();
      } finally {
-       if (!commited) {
-        rollbackTransaction();
-       }
-       if (query != null) {
-        query.closeAll();
-       }
+        rollbackAndCleanup(commited, query);
      }
      return ret;
    }
@@ -8654,9 +8421,13 @@ public class ObjectStore implements RawStore, Configurable {
     }
   }
 
-  protected List<SQLForeignKey> getForeignKeysInternal(final String parent_db_name,
-    final String parent_tbl_name, final String foreign_db_name, final String foreign_tbl_name,
-    boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException {
+  protected List<SQLForeignKey> getForeignKeysInternal(final String parent_db_name_input,
+    final String parent_tbl_name_input, final String foreign_db_name_input,
+    final String foreign_tbl_name_input, boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException {
+    final String parent_db_name = parent_db_name_input;
+    final String parent_tbl_name = parent_tbl_name_input;
+    final String foreign_db_name = foreign_db_name_input;
+    final String foreign_tbl_name = foreign_tbl_name_input;
     return new GetListHelper<SQLForeignKey>(foreign_db_name, foreign_tbl_name, allowSql, allowJdo) {
 
       @Override
@@ -8757,12 +8528,7 @@ public class ObjectStore implements RawStore, Configurable {
       }
       commited = commitTransaction();
     } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      rollbackAndCleanup(commited, query);
     }
     return foreignKeys;
   }
@@ -8790,6 +8556,46 @@ public class ObjectStore implements RawStore, Configurable {
     }
   }
 
+  /**
+   * This is a cleanup method which is used to rollback a active transaction
+   * if the success flag is false and close the associated Query object. This method is used
+   * internally and visible for testing purposes only
+   * @param success Rollback the current active transaction if false
+   * @param query Query object which needs to be closed
+   */
+  @VisibleForTesting
+  void rollbackAndCleanup(boolean success, Query query) {
+    try {
+      if (!success) {
+        rollbackTransaction();
+      }
+    } finally {
+      if (query != null) {
+        query.closeAll();
+      }
+    }
+  }
+
+  /**
+   * This is a cleanup method which is used to rollback a active transaction
+   * if the success flag is false and close the associated QueryWrapper object. This method is used
+   * internally and visible for testing purposes only
+   * @param success Rollback the current active transaction if false
+   * @param queryWrapper QueryWrapper object which needs to be closed
+   */
+  @VisibleForTesting
+  void rollbackAndCleanup(boolean success, QueryWrapper queryWrapper) {
+    try {
+      if (!success) {
+        rollbackTransaction();
+      }
+    } finally {
+      if (queryWrapper != null) {
+        queryWrapper.close();
+      }
+    }
+  }
+
   @Override
   public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) {
     boolean success = false;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index 63b696d..ded978c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -589,6 +590,17 @@ public interface RawStore extends Configurable {
     List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
 
   /**
+   * Get all partition column statistics for a table
+   * @param dbName
+   * @param tableName
+   * @return Map of partition column statistics
+   * @throws MetaException
+   * @throws NoSuchObjectException
+   */
+  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+      String tableName) throws MetaException, NoSuchObjectException;
+
+  /**
    * Get the next notification event.
    * @param rqst Request containing information on the last processed notification.
    * @return list of notifications, sorted by eventId