You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:58 UTC

[11/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
new file mode 100644
index 0000000..7beee42
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.cache;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
+import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class SharedCache {
+  private static Map<String, Database> databaseCache = new TreeMap<String, Database>();
+  private static Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>();
+  private static Map<String, PartitionWrapper> partitionCache = new TreeMap<String, PartitionWrapper>();
+  private static Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<String, ColumnStatisticsObj>();
+  private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>();
+  private static MessageDigest md;
+
+  static {
+    try {
+      md = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException("should not happen", e);
+    }
+  }
+
+  public static synchronized Database getDatabaseFromCache(String name) {
+    return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null;
+  }
+
+  public static synchronized void addDatabaseToCache(String dbName, Database db) {
+    Database dbCopy = db.deepCopy();
+    dbCopy.setName(HiveStringUtils.normalizeIdentifier(dbName));
+    databaseCache.put(dbName, dbCopy);
+  }
+
+  public static synchronized void removeDatabaseFromCache(String dbName) {
+    databaseCache.remove(dbName);
+  }
+
+  public static synchronized List<String> listCachedDatabases() {
+    return new ArrayList<String>(databaseCache.keySet());
+  }
+
+  public static synchronized void alterDatabaseInCache(String dbName, Database newDb) {
+    removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
+    addDatabaseToCache(HiveStringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy());
+  }
+
+  public static synchronized int getCachedDatabaseCount() {
+    return databaseCache.size();
+  }
+
+  public static synchronized Table getTableFromCache(String dbName, String tableName) {
+    TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName));
+    if (tblWrapper == null) {
+      return null;
+    }
+    Table t = CacheUtils.assemble(tblWrapper);
+    return t;
+  }
+
+  public static synchronized void addTableToCache(String dbName, String tblName, Table tbl) {
+    Table tblCopy = tbl.deepCopy();
+    tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName));
+    tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName));
+    for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+      fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
+    }
+    TableWrapper wrapper;
+    if (tbl.getSd()!=null) {
+      byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md);
+      StorageDescriptor sd = tbl.getSd();
+      increSd(sd, sdHash);
+      tblCopy.setSd(null);
+      wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters());
+    } else {
+      wrapper = new TableWrapper(tblCopy, null, null, null);
+    }
+    tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper);
+  }
+
+  public static synchronized void removeTableFromCache(String dbName, String tblName) {
+    TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName));
+    byte[] sdHash = tblWrapper.getSdHash();
+    if (sdHash!=null) {
+      decrSd(sdHash);
+    }
+  }
+
+  public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
+    removeTableFromCache(dbName, tblName);
+    addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
+        HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
+      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
+      for (Partition part : partitions) {
+        removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues());
+        part.setDbName(HiveStringUtils.normalizeIdentifier(newTable.getDbName()));
+        part.setTableName(HiveStringUtils.normalizeIdentifier(newTable.getTableName()));
+        addPartitionToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
+            HiveStringUtils.normalizeIdentifier(newTable.getTableName()), part);
+      }
+    }
+  }
+
+  public static synchronized int getCachedTableCount() {
+    return tableCache.size();
+  }
+
+  public static synchronized List<Table> listCachedTables(String dbName) {
+    List<Table> tables = new ArrayList<Table>();
+    for (TableWrapper wrapper : tableCache.values()) {
+      if (wrapper.getTable().getDbName().equals(dbName)) {
+        tables.add(CacheUtils.assemble(wrapper));
+      }
+    }
+    return tables;
+  }
+
+  public static synchronized void updateTableColumnStatistics(String dbName, String tableName,
+      List<ColumnStatisticsObj> statsObjs) {
+    Table tbl = getTableFromCache(dbName, tableName);
+    tbl.getSd().getParameters();
+    List<String> colNames = new ArrayList<>();
+    for (ColumnStatisticsObj statsObj:statsObjs) {
+      colNames.add(statsObj.getColName());
+    }
+    StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
+    alterTableInCache(dbName, tableName, tbl);
+  }
+
+  public static synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
+    List<TableMeta> tableMetas = new ArrayList<TableMeta>();
+    for (String dbName : listCachedDatabases()) {
+      if (CacheUtils.matches(dbName, dbNames)) {
+        for (Table table : listCachedTables(dbName)) {
+          if (CacheUtils.matches(table.getTableName(), tableNames)) {
+            if (tableTypes==null || tableTypes.contains(table.getTableType())) {
+              TableMeta metaData = new TableMeta(
+                  dbName, table.getTableName(), table.getTableType());
+                metaData.setComments(table.getParameters().get("comment"));
+                tableMetas.add(metaData);
+            }
+          }
+        }
+      }
+    }
+    return tableMetas;
+  }
+
+  public static synchronized void addPartitionToCache(String dbName, String tblName, Partition part) {
+    Partition partCopy = part.deepCopy();
+    PartitionWrapper wrapper;
+    if (part.getSd()!=null) {
+      byte[] sdHash = HBaseUtils.hashStorageDescriptor(part.getSd(), md);
+      StorageDescriptor sd = part.getSd();
+      increSd(sd, sdHash);
+      partCopy.setSd(null);
+      wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
+    } else {
+      wrapper = new PartitionWrapper(partCopy, null, null, null);
+    }
+    partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper);
+  }
+
+  public static synchronized Partition getPartitionFromCache(String key) {
+    PartitionWrapper wrapper = partitionCache.get(key);
+    if (wrapper == null) {
+      return null;
+    }
+    Partition p = CacheUtils.assemble(wrapper);
+    return p;
+  }
+
+  public static synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
+    return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals));
+  }
+
+  public static synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
+    return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
+  }
+
+  public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List<String> part_vals) {
+    PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
+    if (wrapper.getSdHash()!=null) {
+      decrSd(wrapper.getSdHash());
+    }
+    return wrapper.getPartition();
+  }
+
+  public static synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
+    List<Partition> partitions = new ArrayList<Partition>();
+    int count = 0;
+    for (PartitionWrapper wrapper : partitionCache.values()) {
+      if (wrapper.getPartition().getDbName().equals(dbName)
+          && wrapper.getPartition().getTableName().equals(tblName)
+          && (max == -1 || count < max)) {
+        partitions.add(CacheUtils.assemble(wrapper));
+        count++;
+      }
+    }
+    return partitions;
+  }
+
+  public static synchronized void alterPartitionInCache(String dbName, String tblName, List<String> partVals, Partition newPart) {
+    removePartitionFromCache(dbName, tblName, partVals);
+    addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
+        HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
+  }
+
+  public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName,
+      List<String> partVals, List<ColumnStatisticsObj> statsObjs) {
+    Partition part = getPartitionFromCache(dbName, tableName, partVals);
+    part.getSd().getParameters();
+    List<String> colNames = new ArrayList<>();
+    for (ColumnStatisticsObj statsObj:statsObjs) {
+      colNames.add(statsObj.getColName());
+    }
+    StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
+    alterPartitionInCache(dbName, tableName, partVals, part);
+  }
+
+  public static synchronized int getCachedPartitionCount() {
+    return partitionCache.size();
+  }
+
+  public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) {
+    return partitionColStatsCache.get(key);
+  }
+
+  public static synchronized void addPartitionColStatsToCache(Map<String, ColumnStatisticsObj> aggrStatsPerPartition) {
+    partitionColStatsCache.putAll(aggrStatsPerPartition);
+  }
+
+
+  public static void increSd(StorageDescriptor sd, byte[] sdHash) {
+    ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
+    if (sdCache.containsKey(byteArray)) {
+      sdCache.get(byteArray).refCount++;
+    } else {
+      StorageDescriptor sdToCache = sd.deepCopy();
+      sdToCache.setLocation(null);
+      sdToCache.setParameters(null);
+      sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1));
+    }
+  }
+
+  public static void decrSd(byte[] sdHash) {
+    ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
+    StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray);
+    sdWrapper.refCount--;
+    if (sdWrapper.getRefCount() == 0) {
+      sdCache.remove(byteArray);
+    }
+  }
+
+  public static StorageDescriptor getSdFromCache(byte[] sdHash) {
+    StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash));
+    return sdWrapper.getSd();
+  }
+
+  // Replace databases in databaseCache with the new list
+  public static synchronized void refreshDatabases(List<Database> databases) {
+    for (String dbName : listCachedDatabases()) {
+      removeDatabaseFromCache(dbName);
+    }
+    for (Database db : databases) {
+      addDatabaseToCache(db.getName(), db);
+    }
+  }
+
+  // Replace tables in tableCache with the new list
+  public static synchronized void refreshTables(String dbName, List<Table> tables) {
+    for (Table tbl : listCachedTables(dbName)) {
+      removeTableFromCache(dbName, tbl.getTableName());
+    }
+    for (Table tbl : tables) {
+      addTableToCache(dbName, tbl.getTableName(), tbl);
+    }
+  }
+
+  public static void refreshPartitions(String dbName, String tblName, List<Partition> partitions) {
+    List<String> keysToRemove = new ArrayList<String>();
+    for (Map.Entry<String, PartitionWrapper> entry : partitionCache.entrySet()) {
+      if (entry.getValue().getPartition().getDbName().equals(dbName)
+          && entry.getValue().getPartition().getTableName().equals(tblName)) {
+        keysToRemove.add(entry.getKey());
+      }
+    }
+    for (String key : keysToRemove) {
+      partitionCache.remove(key);
+    }
+    for (Partition part : partitions) {
+      addPartitionToCache(dbName, tblName, part);
+    }
+  }
+
+  @VisibleForTesting
+  static Map<String, Database> getDatabaseCache() {
+    return databaseCache;
+  }
+
+  @VisibleForTesting
+  static Map<String, TableWrapper> getTableCache() {
+    return tableCache;
+  }
+
+  @VisibleForTesting
+  static Map<String, PartitionWrapper> getPartitionCache() {
+    return partitionCache;
+  }
+
+  @VisibleForTesting
+  static Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() {
+    return sdCache;
+  }
+
+  @VisibleForTesting
+  static Map<String, ColumnStatisticsObj> getPartitionColStatsCache() {
+    return partitionColStatsCache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java
index 8edb50b..e5b8495 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java
@@ -27,13 +27,15 @@ public class AlterPartitionEvent extends ListenerEvent {
   private final Partition oldPart;
   private final Partition newPart;
   private final Table table;
+  private final boolean isTruncateOp;
 
-  public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table,
-      boolean status, HMSHandler handler) {
+  public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, boolean isTruncateOp,
+                             boolean status, HMSHandler handler) {
     super(status, handler);
     this.oldPart = oldPart;
     this.newPart = newPart;
     this.table = table;
+    this.isTruncateOp = isTruncateOp;
   }
 
   /**
@@ -58,4 +60,12 @@ public class AlterPartitionEvent extends ListenerEvent {
   public Table getTable() {
     return table;
   }
+
+  /**
+   * Get the truncate table flag
+   * @return
+   */
+  public boolean getIsTruncateOp() {
+    return isTruncateOp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
index 4d6dce2..22ea513 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
@@ -26,10 +26,13 @@ public class AlterTableEvent extends ListenerEvent {
 
   private final Table newTable;
   private final Table oldTable;
-  public AlterTableEvent (Table oldTable, Table newTable, boolean status, HMSHandler handler) {
+  private final boolean isTruncateOp;
+
+  public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, boolean status, HMSHandler handler) {
     super (status, handler);
     this.oldTable = oldTable;
     this.newTable = newTable;
+    this.isTruncateOp = isTruncateOp;
   }
 
   /**
@@ -45,4 +48,11 @@ public class AlterTableEvent extends ListenerEvent {
   public Table getNewTable() {
     return newTable;
   }
+
+  /**
+   * @return the flag for truncate
+   */
+  public boolean getIsTruncateOp() {
+    return isTruncateOp;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index 7bc0e04..dff1195 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -38,6 +38,7 @@ public class InsertEvent extends ListenerEvent {
   private final String db;
   private final String table;
   private final Map<String, String> keyValues;
+  private final boolean replace;
   private final List<String> files;
   private List<String> fileChecksums = new ArrayList<String>();
 
@@ -56,6 +57,9 @@ public class InsertEvent extends ListenerEvent {
     super(status, handler);
     this.db = db;
     this.table = table;
+
+    // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility
+    this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true);
     this.files = insertData.getFilesAdded();
     GetTableRequest req = new GetTableRequest(db, table);
     req.setCapabilities(HiveMetaStoreClient.TEST_VERSION);
@@ -90,6 +94,13 @@ public class InsertEvent extends ListenerEvent {
   }
 
   /**
+   * @return The replace flag.
+   */
+  public boolean isReplace() {
+    return replace;
+  }
+
+  /**
    * Get list of files created as a result of this DML operation
    *
    * @return list of new files

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
index 62aeb8c..b741549 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
@@ -21,10 +21,18 @@ package org.apache.hadoop.hive.metastore.events;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Base class for all the events which are defined for metastore.
+ *
+ * This class is not thread-safe and not expected to be called in parallel.
  */
 
+@NotThreadSafe
 public abstract class ListenerEvent {
 
   /**
@@ -33,6 +41,26 @@ public abstract class ListenerEvent {
   private final boolean status;
   private final HMSHandler handler;
 
+  /**
+   * Key/value parameters used by listeners to store notifications results
+   * i.e. DbNotificationListener sets a DB_NOTIFICATION_EVENT_ID.
+   *
+   * NotThreadSafe: The parameters map is not expected to be access in parallel by Hive, so keep it thread-unsafe
+   * to avoid locking overhead.
+   */
+  private Map<String, String> parameters;
+
+  /** For performance concerns, it is preferable to cache the unmodifiable parameters variable that will be returned on the
+   * {@link #getParameters()} method. It is expected that {@link #putParameter(String, String)} is called less times
+   * than {@link #getParameters()}, so performance may be better by using this cache.
+   */
+  private Map<String, String> unmodifiableParameters;
+
+  // Listener parameters aren't expected to have many values. So far only
+  // DbNotificationListener will add a parameter; let's set a low initial capacity for now.
+  // If we find out many parameters are added, then we can adjust or remove this initial capacity.
+  private static final int PARAMETERS_INITIAL_CAPACITY = 1;
+
   // Properties passed by the client, to be used in execution hooks.
   private EnvironmentContext environmentContext = null;
 
@@ -40,6 +68,8 @@ public abstract class ListenerEvent {
     super();
     this.status = status;
     this.handler = handler;
+    this.parameters = new HashMap<>(PARAMETERS_INITIAL_CAPACITY);
+    updateUnmodifiableParameters();
   }
 
   /**
@@ -49,6 +79,12 @@ public abstract class ListenerEvent {
     return status;
   }
 
+  /**
+   * Set the environment context of the event.
+   *
+   * @param environmentContext An EnvironmentContext object that contains environment parameters sent from
+   *                           the HMS client.
+   */
   public void setEnvironmentContext(EnvironmentContext environmentContext) {
     this.environmentContext = environmentContext;
   }
@@ -66,4 +102,74 @@ public abstract class ListenerEvent {
   public HMSHandler getHandler() {
     return handler;
   }
+
+  /**
+   * Return all parameters of the listener event. Parameters are read-only (unmodifiable map). If a new parameter
+   * must be added, please use the putParameter() method.
+   *
+   *
+   * @return A map object with all parameters.
+   */
+  public final Map<String, String> getParameters() {
+    return unmodifiableParameters;
+  }
+
+  /**
+   * Put a new parameter to the listener event.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param name Name of the parameter.
+   * @param value Value of the parameter.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  public void putParameter(String name, String value) {
+    putParameterIfAbsent(name, value);
+    updateUnmodifiableParameters();
+  }
+
+  /**
+   * Put a new set the parameters to the listener event.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param parameters A Map object with the a set of parameters.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  public void putParameters(final Map<String, String> parameters) {
+    if (parameters != null) {
+      for (Map.Entry<String, String> entry : parameters.entrySet()) {
+        putParameterIfAbsent(entry.getKey(), entry.getValue());
+      }
+
+      updateUnmodifiableParameters();
+    }
+  }
+
+  /**
+   * Put a parameter to the listener event only if the parameter is absent.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param name Name of the parameter.
+   * @param value Value of the parameter.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  private void putParameterIfAbsent(String name, String value) {
+    if (parameters.containsKey(name)) {
+      throw new IllegalStateException("Invalid attempt to overwrite a read-only parameter: " + name);
+    }
+
+    parameters.put(name, value);
+  }
+
+  /**
+   * Keeps a cache of unmodifiable parameters returned by the getParameters() method.
+   */
+  private void updateUnmodifiableParameters() {
+    unmodifiableParameters = Collections.unmodifiableMap(parameters);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 1340645..945e99e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -2708,6 +2708,8 @@ public class HBaseStore implements RawStore {
 
   @Override
   public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name) throws MetaException {
+    db_name = HiveStringUtils.normalizeIdentifier(db_name);
+    tbl_name = HiveStringUtils.normalizeIdentifier(tbl_name);
     boolean commit = false;
     openTransaction();
     try {
@@ -2726,6 +2728,10 @@ public class HBaseStore implements RawStore {
   public List<SQLForeignKey> getForeignKeys(String parent_db_name, String parent_tbl_name,
                                             String foreign_db_name, String foreign_tbl_name)
       throws MetaException {
+    parent_db_name = parent_db_name!=null?HiveStringUtils.normalizeIdentifier(parent_db_name):null;
+    parent_tbl_name = parent_tbl_name!=null?HiveStringUtils.normalizeIdentifier(parent_tbl_name):null;
+    foreign_db_name = HiveStringUtils.normalizeIdentifier(foreign_db_name);
+    foreign_tbl_name = HiveStringUtils.normalizeIdentifier(foreign_tbl_name);
     boolean commit = false;
     openTransaction();
     try {
@@ -2770,6 +2776,9 @@ public class HBaseStore implements RawStore {
     // This is something of pain, since we have to search both primary key and foreign key to see
     // which they want to drop.
     boolean commit = false;
+    dbName = HiveStringUtils.normalizeIdentifier(dbName);
+    tableName = HiveStringUtils.normalizeIdentifier(tableName);
+    constraintName = HiveStringUtils.normalizeIdentifier(constraintName);
     openTransaction();
     try {
       List<SQLPrimaryKey> pk = getHBase().getPrimaryKey(dbName, tableName);
@@ -2809,6 +2818,12 @@ public class HBaseStore implements RawStore {
   @Override
   public void addPrimaryKeys(List<SQLPrimaryKey> pks) throws InvalidObjectException, MetaException {
     boolean commit = false;
+    for (SQLPrimaryKey pk : pks) {
+      pk.setTable_db(HiveStringUtils.normalizeIdentifier(pk.getTable_db()));
+      pk.setTable_name(HiveStringUtils.normalizeIdentifier(pk.getTable_name()));
+      pk.setColumn_name(HiveStringUtils.normalizeIdentifier(pk.getColumn_name()));
+      pk.setPk_name(HiveStringUtils.normalizeIdentifier(pk.getPk_name()));
+    }
     openTransaction();
     try {
       List<SQLPrimaryKey> currentPk =
@@ -2830,6 +2845,13 @@ public class HBaseStore implements RawStore {
   @Override
   public void addForeignKeys(List<SQLForeignKey> fks) throws InvalidObjectException, MetaException {
     boolean commit = false;
+    for (SQLForeignKey fk : fks) {
+      fk.setPktable_db(HiveStringUtils.normalizeIdentifier(fk.getPktable_db()));
+      fk.setPktable_name(HiveStringUtils.normalizeIdentifier(fk.getPktable_name()));
+      fk.setFktable_db(HiveStringUtils.normalizeIdentifier(fk.getFktable_db()));
+      fk.setFktable_name(HiveStringUtils.normalizeIdentifier(fk.getFktable_name()));
+      fk.setFk_name(HiveStringUtils.normalizeIdentifier(fk.getFk_name()));
+    }
     openTransaction();
     try {
       // Fetch the existing keys (if any) and add in these new ones
@@ -2848,6 +2870,13 @@ public class HBaseStore implements RawStore {
   }
 
   @Override
+  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+      String tableName) throws MetaException, NoSuchObjectException {
+    // TODO: see if it makes sense to implement this here
+    return null;
+  }
+
+  @Override
   public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) {
     // TODO: Auto-generated method stub
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
index 94087b1..3172f92 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
@@ -619,7 +619,7 @@ public class HBaseUtils {
    * @param md message descriptor to use to generate the hash
    * @return the hash as a byte array
    */
-  static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md)  {
+  public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md)  {
     // Note all maps and lists have to be absolutely sorted.  Otherwise we'll produce different
     // results for hashes based on the OS or JVM being used.
     md.reset();

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
index ed6080b..e9ed7e5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
@@ -31,6 +31,8 @@ public abstract class AlterPartitionMessage extends EventMessage {
 
   public abstract String getTable();
 
+  public abstract boolean getIsTruncateOp();
+
   public abstract Map<String,String> getKeyValues();
 
   public abstract Table getTableObj() throws Exception;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
index 5487123..39a87bc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
@@ -28,6 +28,8 @@ public abstract class AlterTableMessage extends EventMessage {
 
   public abstract String getTable();
 
+  public abstract boolean getIsTruncateOp();
+
   public abstract Table getTableObjBefore() throws Exception;
 
   public abstract Table getTableObjAfter() throws Exception;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
index a5414d1..8205c25 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
@@ -30,88 +31,10 @@ import java.util.List;
 
 public class EventUtils {
 
-  /**
-   * Utility function that constructs a notification filter to match a given db name and/or table name.
-   * If dbName == null, fetches all warehouse events.
-   * If dnName != null, but tableName == null, fetches all events for the db
-   * If dbName != null && tableName != null, fetches all events for the specified table
-   * @param dbName
-   * @param tableName
-   * @return
-   */
-  public static IMetaStoreClient.NotificationFilter getDbTblNotificationFilter(final String dbName, final String tableName){
-    return new IMetaStoreClient.NotificationFilter() {
-      @Override
-      public boolean accept(NotificationEvent event) {
-        if (event == null){
-          return false; // get rid of trivial case first, so that we can safely assume non-null
-        }
-        if (dbName == null){
-          return true; // if our dbName is null, we're interested in all wh events
-        }
-        if (dbName.equalsIgnoreCase(event.getDbName())){
-          if ( (tableName == null)
-              // if our dbName is equal, but tableName is blank, we're interested in this db-level event
-              || (tableName.equalsIgnoreCase(event.getTableName()))
-            // table level event that matches us
-              ){
-            return true;
-          }
-        }
-        return false;
-      }
-    };
-  }
-
-  public static IMetaStoreClient.NotificationFilter restrictByMessageFormat(final String messageFormat){
-    return new IMetaStoreClient.NotificationFilter() {
-      @Override
-      public boolean accept(NotificationEvent event) {
-        if (event == null){
-          return false; // get rid of trivial case first, so that we can safely assume non-null
-        }
-        if (messageFormat == null){
-          return true; // let's say that passing null in will not do any filtering.
-        }
-        if (messageFormat.equalsIgnoreCase(event.getMessageFormat())){
-          return true;
-        }
-        return false;
-      }
-    };
-  }
-
-  public static IMetaStoreClient.NotificationFilter getEventBoundaryFilter(final Long eventFrom, final Long eventTo){
-    return new IMetaStoreClient.NotificationFilter() {
-      @Override
-      public boolean accept(NotificationEvent event) {
-        if ( (event == null) || (event.getEventId() < eventFrom) || (event.getEventId() > eventTo)) {
-          return false;
-        }
-        return true;
-      }
-    };
-  }
-
-  public static IMetaStoreClient.NotificationFilter andFilter(
-      final IMetaStoreClient.NotificationFilter... filters ) {
-    return new IMetaStoreClient.NotificationFilter() {
-      @Override
-      public boolean accept(NotificationEvent event) {
-        for (IMetaStoreClient.NotificationFilter filter : filters){
-          if (!filter.accept(event)){
-            return false;
-          }
-        }
-        return true;
-      }
-    };
-  }
-
   public interface NotificationFetcher {
-    public int getBatchSize() throws IOException;
-    public long getCurrentNotificationEventId() throws IOException;
-    public List<NotificationEvent> getNextNotificationEvents(
+    int getBatchSize() throws IOException;
+    long getCurrentNotificationEventId() throws IOException;
+    List<NotificationEvent> getNextNotificationEvents(
         long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
   }
 
@@ -177,7 +100,7 @@ public class EventUtils {
     public NotificationEventIterator(
         NotificationFetcher nfetcher, long eventFrom, int maxEvents,
         String dbName, String tableName) throws IOException {
-      init(nfetcher, eventFrom, maxEvents, EventUtils.getDbTblNotificationFilter(dbName, tableName));
+      init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName));
       // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter
       // is an operation that needs to run before delegating to the other ctor, and this messes up chaining
       // ctors

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
index 3d16721..6d146e0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -37,6 +37,12 @@ public abstract class InsertMessage extends EventMessage {
   public abstract String getTable();
 
   /**
+   * Getter for the replace flag being insert into/overwrite
+   * @return Replace flag to represent INSERT INTO or INSERT OVERWRITE (Boolean).
+   */
+  public abstract boolean isReplace();
+
+  /**
    * Get the map of partition keyvalues.  Will be null if this insert is to a table and not a
    * partition.
    * @return Map of partition keyvalues, or null.

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index aa770f2..1bd52a8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -149,9 +149,10 @@ public abstract class MessageFactory {
    * and some are not yet supported.
    * @param before The table before the alter
    * @param after The table after the alter
+   * @param isTruncateOp Flag to denote truncate table
    * @return
    */
-  public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after);
+  public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp);
 
   /**
    * Factory method for DropTableMessage.
@@ -175,10 +176,11 @@ public abstract class MessageFactory {
    * @param table The table in which the partition is being altered
    * @param before The partition before it was altered
    * @param after The partition after it was altered
+   * @param isTruncateOp Flag to denote truncate partition
    * @return a new AlterPartitionMessage
    */
   public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
-                                                                   Partition after);
+                                                                   Partition after, boolean isTruncateOp);
 
   /**
    * Factory method for DropPartitionMessage.
@@ -231,9 +233,10 @@ public abstract class MessageFactory {
    * @param table Name of the table the insert occurred in
    * @param partVals Partition values for the partition that the insert occurred in, may be null if
    *          the insert was done into a non-partitioned table
+   * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO
    * @param files Iterator of file created
    * @return instance of InsertMessage
    */
   public abstract InsertMessage buildInsertMessage(String db, String table,
-      Map<String, String> partVals, Iterator<String> files);
+      Map<String, String> partVals, boolean replace, Iterator<String> files);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java
index b10b8a8..4fd7f8c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java
@@ -22,10 +22,13 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.codehaus.jackson.annotate.JsonProperty;
 
 public class PartitionFiles {
 
+  @JsonProperty
   private String partitionName;
+  @JsonProperty
   private List<String> files;
 
   public PartitionFiles(String partitionName, Iterator<String> files) {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java
new file mode 100644
index 0000000..d6429f6
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public class AndFilter implements IMetaStoreClient.NotificationFilter {
+  final IMetaStoreClient.NotificationFilter[] filters;
+
+  public AndFilter(final IMetaStoreClient.NotificationFilter... filters) {
+    this.filters = filters;
+  }
+
+  @Override
+  public boolean accept(final NotificationEvent event) {
+    for (IMetaStoreClient.NotificationFilter filter : filters) {
+      if (!filter.accept(event)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java
new file mode 100644
index 0000000..5294063
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public abstract class BasicFilter implements NotificationFilter {
+  @Override
+  public boolean accept(final NotificationEvent event) {
+    if (event == null) {
+      return false; // get rid of trivial case first, so that we can safely assume non-null
+    }
+    return shouldAccept(event);
+  }
+
+  abstract boolean shouldAccept(final NotificationEvent event);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
new file mode 100644
index 0000000..4a7ca6d
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+/**
+ * Utility function that constructs a notification filter to match a given db name and/or table name.
+ * If dbName == null, fetches all warehouse events.
+ * If dnName != null, but tableName == null, fetches all events for the db
+ * If dbName != null && tableName != null, fetches all events for the specified table
+ */
+public class DatabaseAndTableFilter extends BasicFilter {
+  private final String databaseName, tableName;
+
+  public DatabaseAndTableFilter(final String databaseName, final String tableName) {
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+  }
+
+  @Override
+  boolean shouldAccept(final NotificationEvent event) {
+    if (databaseName == null) {
+      return true; // if our dbName is null, we're interested in all wh events
+    }
+    if (databaseName.equalsIgnoreCase(event.getDbName())) {
+      if ((tableName == null)
+          // if our dbName is equal, but tableName is blank, we're interested in this db-level event
+          || (tableName.equalsIgnoreCase(event.getTableName()))
+        // table level event that matches us
+          ) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java
new file mode 100644
index 0000000..137b4ce
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public class EventBoundaryFilter extends BasicFilter {
+  private final long eventFrom, eventTo;
+
+  public EventBoundaryFilter(final long eventFrom, final long eventTo) {
+    this.eventFrom = eventFrom;
+    this.eventTo = eventTo;
+  }
+
+  @Override
+  boolean shouldAccept(final NotificationEvent event) {
+    return eventFrom <= event.getEventId() && event.getEventId() <= eventTo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java
new file mode 100644
index 0000000..4e91ee6
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public class MessageFormatFilter extends BasicFilter {
+  private final String format;
+
+  public MessageFormatFilter(String format) {
+    this.format = format;
+  }
+
+  @Override
+  boolean shouldAccept(final NotificationEvent event) {
+    if (format == null) {
+      return true; // let's say that passing null in will not do any filtering.
+    }
+    return format.equalsIgnoreCase(event.getMessageFormat());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
index dd1bf3c..bd7776c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
@@ -37,6 +37,9 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
   String server, servicePrincipal, db, table, tableObjJson;
 
   @JsonProperty
+  String isTruncateOp;
+
+  @JsonProperty
   Long timestamp;
 
   @JsonProperty
@@ -52,11 +55,12 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
   }
 
   public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj,
-      Partition partitionObjBefore, Partition partitionObjAfter, Long timestamp) {
+      Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long timestamp) {
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.db = tableObj.getDbName();
     this.table = tableObj.getTableName();
+    this.isTruncateOp = Boolean.toString(isTruncateOp);
     this.timestamp = timestamp;
     this.keyValues = JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore);
     try {
@@ -95,6 +99,9 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
   }
 
   @Override
+  public boolean getIsTruncateOp() { return Boolean.parseBoolean(isTruncateOp); }
+
+  @Override
   public Map<String, String> getKeyValues() {
     return keyValues;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
index 792015e..58eb1a7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
@@ -32,6 +32,9 @@ public class JSONAlterTableMessage extends AlterTableMessage {
   String server, servicePrincipal, db, table, tableObjBeforeJson, tableObjAfterJson;
 
   @JsonProperty
+  String isTruncateOp;
+
+  @JsonProperty
   Long timestamp;
 
   /**
@@ -41,11 +44,12 @@ public class JSONAlterTableMessage extends AlterTableMessage {
   }
 
   public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObjBefore, Table tableObjAfter,
-      Long timestamp) {
+      boolean isTruncateOp, Long timestamp) {
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.db = tableObjBefore.getDbName();
     this.table = tableObjBefore.getTableName();
+    this.isTruncateOp = Boolean.toString(isTruncateOp);
     this.timestamp = timestamp;
     try {
       this.tableObjBeforeJson = JSONMessageFactory.createTableObjJson(tableObjBefore);
@@ -82,6 +86,9 @@ public class JSONAlterTableMessage extends AlterTableMessage {
   }
 
   @Override
+  public boolean getIsTruncateOp() { return Boolean.parseBoolean(isTruncateOp); }
+
+  @Override
   public Table getTableObjBefore() throws Exception {
     return (Table) JSONMessageFactory.getTObj(tableObjBeforeJson,Table.class);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index e1316a4..c059d47 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -40,6 +40,9 @@ public class JSONInsertMessage extends InsertMessage {
   Long timestamp;
 
   @JsonProperty
+  String replace;
+
+  @JsonProperty
   List<String> files;
 
   @JsonProperty
@@ -52,12 +55,13 @@ public class JSONInsertMessage extends InsertMessage {
   }
 
   public JSONInsertMessage(String server, String servicePrincipal, String db, String table,
-      Map<String, String> partKeyVals, Iterator<String> fileIter, Long timestamp) {
+      Map<String, String> partKeyVals, boolean replace, Iterator<String> fileIter, Long timestamp) {
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.db = db;
     this.table = table;
     this.timestamp = timestamp;
+    this.replace = Boolean.toString(replace);
     this.partKeyVals = partKeyVals;
     this.files = Lists.newArrayList(fileIter);
     checkValid();
@@ -99,6 +103,9 @@ public class JSONInsertMessage extends InsertMessage {
   }
 
   @Override
+  public boolean isReplace() { return Boolean.parseBoolean(replace); }
+
+  @Override
   public String toString() {
     try {
       return JSONMessageDeserializer.mapper.writeValueAsString(this);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
index 41732c7..40ef5fb 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
 
 /**
  * MessageDeserializer implementation, for deserializing from JSON strings.
@@ -46,6 +47,9 @@ public class JSONMessageDeserializer extends MessageDeserializer {
 
   static {
     mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, false);
+    mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, false);
+    mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 3406afb..04a4041 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -28,6 +28,10 @@ import javax.annotation.Nullable;
 
 import com.google.common.collect.Iterables;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.Index;
@@ -104,8 +108,8 @@ public class JSONMessageFactory extends MessageFactory {
   }
 
   @Override
-  public AlterTableMessage buildAlterTableMessage(Table before, Table after) {
-    return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, now());
+  public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp) {
+    return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, isTruncateOp, now());
   }
 
   @Override
@@ -123,8 +127,8 @@ public class JSONMessageFactory extends MessageFactory {
 
   @Override
   public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
-      Partition after) {
-    return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, before, after,
+      Partition after, boolean isTruncateOp) {
+    return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, before, after, isTruncateOp,
         now());
   }
 
@@ -161,10 +165,9 @@ public class JSONMessageFactory extends MessageFactory {
   }
 
   @Override
-  public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals,
+  public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, boolean replace,
       Iterator<String> fileIter) {
-    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals,
-        fileIter, now());
+    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now());
   }
 
   private long now() {
@@ -298,5 +301,4 @@ public class JSONMessageFactory extends MessageFactory {
     };
     return getTObjs(Iterables.transform(jsonArrayIterator, textExtractor), objClass);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java b/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
index 63be7b7..10fcbea 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
@@ -284,7 +284,7 @@ public class ExpressionTree {
     //can only support "=" and "!=" for now, because our JDO lib is buggy when
     // using objects from map.get()
     private static final Set<Operator> TABLE_FILTER_OPS = Sets.newHashSet(
-        Operator.EQUALS, Operator.NOTEQUALS, Operator.NOTEQUALS2);
+        Operator.EQUALS, Operator.NOTEQUALS, Operator.NOTEQUALS2, Operator.LIKE);
 
     private void generateJDOFilterOverTables(Map<String, Object> params,
         FilterBuilder filterBuilder) throws MetaException {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index d378d06..970038d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -52,6 +52,7 @@ import javax.sql.DataSource;
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.nio.ByteBuffer;
 import java.sql.*;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -147,6 +148,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
 
   static private DataSource connPool;
+  private static DataSource connPoolMutex;
   static private boolean doRetryOnConnPool = false;
   
   private enum OpertaionType {
@@ -203,8 +205,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   private int deadlockCnt;
   private long deadlockRetryInterval;
   protected HiveConf conf;
-  protected DatabaseProduct dbProduct;
-  private SQLGenerator sqlGenerator;
+  private static DatabaseProduct dbProduct;
+  private static SQLGenerator sqlGenerator;
 
   // (End user) Transaction timeout, in milliseconds.
   private long timeout;
@@ -223,7 +225,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    */
   private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
   private static final String hostname = ServerUtils.hostname();
-  private static volatile boolean dumpConfig = true;
 
   // Private methods should never catch SQLException and then throw MetaException.  The public
   // methods depend on SQLException coming back so they can detect and handle deadlocks.  Private
@@ -247,20 +248,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
     checkQFileTestHack();
 
-    Connection dbConn = null;
-    // Set up the JDBC connection pool
-    try {
-      setupJdbcConnectionPool(conf);
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      determineDatabaseProduct(dbConn);
-      sqlGenerator = new SQLGenerator(dbProduct, conf);
-    } catch (SQLException e) {
-      String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage();
-      LOG.error(msg);
-      throw new RuntimeException(e);
-    }
-    finally {
-      closeDbConn(dbConn);
+    synchronized (TxnHandler.class) {
+      if (connPool == null) {
+        //only do this once per JVM; useful for support
+        LOG.info(HiveConfUtil.dumpConfig(conf).toString());
+
+        Connection dbConn = null;
+        // Set up the JDBC connection pool
+        try {
+          int maxPoolSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_MAX_CONNECTIONS);
+          long getConnectionTimeoutMs = 30000;
+          connPool = setupJdbcConnectionPool(conf, maxPoolSize, getConnectionTimeoutMs);
+          /*the mutex pools should ideally be somewhat larger since some operations require 1
+           connection from each pool and we want to avoid taking a connection from primary pool
+           and then blocking because mutex pool is empty.  There is only 1 thread in any HMS trying
+           to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock.  The CheckLock operation gets a
+           connection from connPool first, then connPoolMutex.  All others, go in the opposite
+           order (not very elegant...).  So number of connection requests for connPoolMutex cannot
+           exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
+          connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + MUTEX_KEY.values().length, getConnectionTimeoutMs);
+          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+          determineDatabaseProduct(dbConn);
+          sqlGenerator = new SQLGenerator(dbProduct, conf);
+        } catch (SQLException e) {
+          String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage();
+          LOG.error(msg);
+          throw new RuntimeException(e);
+        } finally {
+          closeDbConn(dbConn);
+        }
+      }
     }
 
     timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -270,11 +287,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
     deadlockRetryInterval = retryInterval / 10;
     maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS);
-    if(dumpConfig) {
-      LOG.info(HiveConfUtil.dumpConfig(conf).toString());
-      //only do this once per JVM; useful for support
-      dumpConfig = false;
-    }
   }
   @Override
   @RetrySemantics.ReadOnly
@@ -367,7 +379,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       try {
         /**
          * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
-\         */
+         */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select ntxn_next - 1 from NEXT_TXN_ID";
@@ -383,23 +395,27 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             "initialized, null record found in next_txn_id");
         }
         close(rs);
-        Set<Long> openList = new HashSet<Long>();
+        List<Long> openList = new ArrayList<Long>();
         //need the WHERE clause below to ensure consistent results with READ_COMMITTED
-        s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm;
+        s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm + " order by txn_id";
         LOG.debug("Going to execute query<" + s + ">");
         rs = stmt.executeQuery(s);
         long minOpenTxn = Long.MAX_VALUE;
+        BitSet abortedBits = new BitSet();
         while (rs.next()) {
           long txnId = rs.getLong(1);
           openList.add(txnId);
           char c = rs.getString(2).charAt(0);
           if(c == TXN_OPEN) {
             minOpenTxn = Math.min(minOpenTxn, txnId);
+          } else if (c == TXN_ABORTED) {
+            abortedBits.set(openList.size() - 1);
           }
         }
         LOG.debug("Going to rollback");
         dbConn.rollback();
-        GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList);
+        ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
+        GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer);
         if(minOpenTxn < Long.MAX_VALUE) {
           otr.setMin_open_txn(minOpenTxn);
         }
@@ -844,7 +860,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   /**
    * As much as possible (i.e. in absence of retries) we want both operations to be done on the same
    * connection (but separate transactions).  This avoid some flakiness in BONECP where if you
-   * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one
+   * perform an operation on 1 connection and immediately get another from the pool, the 2nd one
    * doesn't see results of the first.
    * 
    * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case
@@ -983,6 +999,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
                 case SELECT:
                   updateTxnComponents = false;
                   break;
+                case NO_TXN:
+                  /*this constant is a bit of a misnomer since we now always have a txn context.  It
+                   just means the operation is such that we don't care what tables/partitions it
+                   affected as it doesn't trigger a compaction or conflict detection.  A better name
+                   would be NON_TRANSACTIONAL.*/
+                  updateTxnComponents = false;
+                  break;
                 default:
                   //since we have an open transaction, only 4 values above are expected 
                   throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
@@ -1934,7 +1957,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   }
 
-  protected Connection getDbConn(int isolationLevel) throws SQLException {
+  Connection getDbConn(int isolationLevel) throws SQLException {
+    return getDbConn(isolationLevel, connPool);
+  }
+  private Connection getDbConn(int isolationLevel, DataSource connPool) throws SQLException {
     int rc = doRetryOnConnPool ? 10 : 1;
     Connection dbConn = null;
     while (true) {
@@ -2457,14 +2483,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       response.setLockid(extLockId);
 
       LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
-      Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+      Savepoint save = dbConn.setSavepoint();
       StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
         "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
         "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
 
       Set<String> strings = new HashSet<String>(locksBeingChecked.size());
 
-      //This the set of entities that the statement represnted by extLockId wants to update
+      //This the set of entities that the statement represented by extLockId wants to update
       List<LockInfo> writeSet = new ArrayList<>();
 
       for (LockInfo info : locksBeingChecked) {
@@ -3131,9 +3157,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
-  private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
-    if (connPool != null) return;
-
+  private static synchronized DataSource setupJdbcConnectionPool(HiveConf conf, int maxPoolSize, long getConnectionTimeoutMs) throws SQLException {
     String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY);
     String user = getMetastoreJdbcUser(conf);
     String passwd = getMetastoreJdbcPasswd(conf);
@@ -3143,33 +3167,40 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     if ("bonecp".equals(connectionPooler)) {
       BoneCPConfig config = new BoneCPConfig();
       config.setJdbcUrl(driverUrl);
-      //if we are waiting for connection for 60s, something is really wrong
+      //if we are waiting for connection for a long time, something is really wrong
       //better raise an error than hang forever
-      config.setConnectionTimeoutInMs(60000);
-      config.setMaxConnectionsPerPartition(10);
+      //see DefaultConnectionStrategy.getConnectionInternal()
+      config.setConnectionTimeoutInMs(getConnectionTimeoutMs);
+      config.setMaxConnectionsPerPartition(maxPoolSize);
       config.setPartitionCount(1);
       config.setUser(user);
       config.setPassword(passwd);
-      connPool = new BoneCPDataSource(config);
       doRetryOnConnPool = true;  // Enable retries to work around BONECP bug.
+      return new BoneCPDataSource(config);
     } else if ("dbcp".equals(connectionPooler)) {
-      ObjectPool objectPool = new GenericObjectPool();
+      GenericObjectPool objectPool = new GenericObjectPool();
+      //https://commons.apache.org/proper/commons-pool/api-1.6/org/apache/commons/pool/impl/GenericObjectPool.html#setMaxActive(int)
+      objectPool.setMaxActive(maxPoolSize);
+      objectPool.setMaxWait(getConnectionTimeoutMs);
       ConnectionFactory connFactory = new DriverManagerConnectionFactory(driverUrl, user, passwd);
       // This doesn't get used, but it's still necessary, see
       // http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup
       PoolableConnectionFactory poolConnFactory =
           new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true);
-      connPool = new PoolingDataSource(objectPool);
+      return new PoolingDataSource(objectPool);
     } else if ("hikaricp".equals(connectionPooler)) {
       HikariConfig config = new HikariConfig();
+      config.setMaximumPoolSize(maxPoolSize);
       config.setJdbcUrl(driverUrl);
       config.setUsername(user);
       config.setPassword(passwd);
+      //https://github.com/brettwooldridge/HikariCP
+      config.setConnectionTimeout(getConnectionTimeoutMs);
 
-      connPool = new HikariDataSource(config);
+      return new HikariDataSource(config);
     } else if ("none".equals(connectionPooler)) {
       LOG.info("Choosing not to pool JDBC connections");
-      connPool = new NoPoolConnectionPool(conf);
+      return new NoPoolConnectionPool(conf);
     } else {
       throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler);
     }
@@ -3427,7 +3458,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       try {
         String sqlStmt = sqlGenerator.addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0");
         lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
         stmt = dbConn.createStatement();
         if(LOG.isDebugEnabled()) {
           LOG.debug("About to execute SQL: " + sqlStmt);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 517eec3..6e0070b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -32,9 +32,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class TxnUtils {
   private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
@@ -50,8 +50,13 @@ public class TxnUtils {
    * @return a valid txn list.
    */
   public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+    /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+     * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
+     * doesn't make sense for Snapshot Isolation.  Of course for Read Committed, the list should
+     * inlude the latest committed set.*/
     long highWater = txns.getTxn_high_water_mark();
-    Set<Long> open = txns.getOpen_txns();
+    List<Long> open = txns.getOpen_txns();
+    BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
     long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
     int i = 0;
     for(long txn: open) {
@@ -59,10 +64,10 @@ public class TxnUtils {
       exceptions[i++] = txn;
     }
     if(txns.isSetMin_open_txn()) {
-      return new ValidReadTxnList(exceptions, highWater, txns.getMin_open_txn());
+      return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn());
     }
     else {
-      return new ValidReadTxnList(exceptions, highWater);
+      return new ValidReadTxnList(exceptions, abortedBits, highWater);
     }
   }
 
@@ -93,7 +98,9 @@ public class TxnUtils {
       exceptions = Arrays.copyOf(exceptions, i);
     }
     highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
-    return new ValidCompactorTxnList(exceptions, highWater);
+    BitSet bitSet = new BitSet(exceptions.length);
+    bitSet.set(0, bitSet.length()); // for ValidCompactorTxnList, everything in exceptions are aborted
+    return new ValidCompactorTxnList(exceptions, bitSet, highWater);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/model/package.jdo
----------------------------------------------------------------------
diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo
index 844bc46..67e2c20 100644
--- a/metastore/src/model/package.jdo
+++ b/metastore/src/model/package.jdo
@@ -63,10 +63,10 @@
 
     <class name="MFieldSchema" embedded-only="true" table="TYPE_FIELDS" detachable="true">
       <field name="name">
-        <column name="FNAME" length="128" jdbc-type="VARCHAR"/>
+        <column name="FNAME" length="767" jdbc-type="VARCHAR"/>
       </field>
       <field name="type" >
-        <column name="FTYPE" length="4000" jdbc-type="VARCHAR" allows-null="false"/>
+        <column name="FTYPE" jdbc-type="CLOB" allows-null="false"/>
       </field>
       <field name="comment" >
         <column name="FCOMMENT" length="4000" jdbc-type="VARCHAR" allows-null="true"/>
@@ -118,7 +118,7 @@
         <column name="DB_ID"/>
       </index>
       <field name="tableName">
-        <column name="TBL_NAME" length="128" jdbc-type="VARCHAR"/>
+        <column name="TBL_NAME" length="256" jdbc-type="VARCHAR"/>
       </field>
       <field name="database">
         <column name="DB_ID"/>
@@ -170,7 +170,7 @@
            <column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/>
         </key>
         <value>
-           <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/>
+           <column name="PARAM_VALUE" jdbc-type="CLOB"/>
         </value>
       </field>
       <field name="viewOriginalText" default-fetch-group="false">
@@ -251,14 +251,14 @@
            <column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/>
         </key>
         <value>
-           <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/>
+           <column name="PARAM_VALUE" jdbc-type="CLOB"/>
         </value>
       </field>
     </class>
 
     <class name="MOrder" embedded-only="true" table="SORT_ORDER" detachable="true">
       <field name="col">
-        <column name="COL_NAME" length="128" jdbc-type="VARCHAR"/>
+        <column name="COL_NAME" length="767" jdbc-type="VARCHAR"/>
       </field>
       <field name="order">
         <column name="ORDER" jdbc-type="INTEGER"  allows-null="false"/>
@@ -280,10 +280,10 @@
         <element>
           <embedded>
             <field name="name">
-              <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/>
+              <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/>
               </field>
             <field name="type">
-              <column name="TYPE_NAME" length="4000" jdbc-type="VARCHAR"  allows-null="false"/>
+              <column name="TYPE_NAME" jdbc-type="CLOB"  allows-null="false"/>
             </field>
             <field name="comment">
               <column name="COMMENT" length="256" jdbc-type="VARCHAR" allows-null="true"/>
@@ -349,7 +349,7 @@
         <element>
           <embedded>
             <field name="col">
-              <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/>
+              <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/>
               </field>
             <field name="order">
               <column name="ORDER" jdbc-type="INTEGER"  allows-null="false"/>
@@ -366,7 +366,7 @@
            <column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/>
         </key>
         <value>
-           <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/>
+           <column name="PARAM_VALUE" jdbc-type="CLOB"/>
         </value>
       </field>
       <field name="skewedColNames" table="SKEWED_COL_NAMES">
@@ -725,7 +725,7 @@
         <column name="TBL_ID" />
       </field>
       <field name="columnName">
-        <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/>
+        <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/>
       </field>
       <field name="privilege">
         <column name="TBL_COL_PRIV" length="128" jdbc-type="VARCHAR"/>
@@ -770,7 +770,7 @@
         <column name="PART_ID" />
       </field>
       <field name="columnName">
-        <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/>
+        <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/>
       </field>
       <field name="privilege">
         <column name="PART_COL_PRIV" length="128" jdbc-type="VARCHAR"/>
@@ -803,7 +803,7 @@
         <column name="DB_NAME" length="128" jdbc-type="VARCHAR"/>
       </field>
       <field name="tblName">
-        <column name="TBL_NAME" length="128" jdbc-type="VARCHAR"/>
+        <column name="TBL_NAME" length="256" jdbc-type="VARCHAR"/>
       </field>
        <field name="partName">
         <column name="PARTITION_NAME" length="767" jdbc-type="VARCHAR"/>
@@ -850,13 +850,13 @@
         <column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
       </field>
       <field name="tableName">
-        <column name="TABLE_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+        <column name="TABLE_NAME" length="256" jdbc-type="VARCHAR" allows-null="false"/>
       </field>
       <field name="table">
         <column name="TBL_ID"/>
       </field>
       <field name="colName">
-        <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+        <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/>
       </field>
       <field name="colType">
         <column name="COLUMN_TYPE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
@@ -911,7 +911,7 @@
         <column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
       </field>
       <field name="tableName">
-        <column name="TABLE_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+        <column name="TABLE_NAME" length="256" jdbc-type="VARCHAR" allows-null="false"/>
       </field>
       <field name="partitionName">
         <column name="PARTITION_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/>
@@ -920,7 +920,7 @@
         <column name="PART_ID"/>
       </field>
       <field name="colName">
-        <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+        <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/>
       </field>
       <field name="colType">
         <column name="COLUMN_TYPE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
@@ -1050,7 +1050,7 @@
         <column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/>
       </field>
       <field name="tableName">
-        <column name="TBL_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/>
+        <column name="TBL_NAME" length="256" jdbc-type="VARCHAR" allows-null="true"/>
       </field>
       <field name="message">
         <column name="MESSAGE" jdbc-type="LONGVARCHAR"/>

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 64da9b4..7760bc7 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -873,6 +874,13 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
   }
 
   @Override
+  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+      String tableName) throws MetaException, NoSuchObjectException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
   public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) {
   }