You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2016/08/15 23:56:43 UTC

[1/3] incubator-impala git commit: IMPALA-2347: Reuse metastore client connections in Catalog

Repository: incubator-impala
Updated Branches:
  refs/heads/master fe97579fe -> 6fc399ebc


IMPALA-2347: Reuse metastore client connections in Catalog

Currently we create a new connection to metastore every time Catalog
connects to HMS. This was intentionally done to circumvent HIVE-5181.
Given it is fixed already in Hive, this patch intends to refactor the
HMS client usage on the catalog to reuse the connections. Additionally
this patch makes MetaStoreClient implement AutoCloseable interface and
hence all the callers can use the try-with-resources to create a new
metastore client and needn't explicitly call release(). Also, this
patch increases the default initial metastore pool size to 10 from a
previous value of 5, which is less even for a decent DDL load.

In terms of design, this patch switches the metastore client
implementation to RetryingMetaStoreClient from previous implementation
of HiveMetaStoreClient. The reason for this switch is to handle HMS
failures from Catalog side where the entire metastore client pool
cache becomes stale in the event of a metastore restart and there is
no proper way to deal with it. RetryingMetaStoreClient has inbuilt
retry mechanism which reconnects stale connections in the event of
failures. For more details on retries and corresponding configurations,
check org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.

Change-Id: I517c0e1efef2584cd8d34017b33574f2ad69bd52
Reviewed-on: http://gerrit.cloudera.org:8080/3984
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/15d20d8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/15d20d8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/15d20d8f

Branch: refs/heads/master
Commit: 15d20d8f12ffd01a1da807ea003939f1ff24a7bb
Parents: fe97579
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Tue Jun 14 04:54:58 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Mon Aug 15 20:32:35 2016 +0000

----------------------------------------------------------------------
 .../analysis/CreateTableAsSelectStmt.java       |   5 +-
 .../com/cloudera/impala/catalog/Catalog.java    |   2 +-
 .../impala/catalog/CatalogServiceCatalog.java   |  20 +--
 .../impala/catalog/DataSourceTable.java         |   6 +-
 .../com/cloudera/impala/catalog/HBaseTable.java |   4 +-
 .../com/cloudera/impala/catalog/HdfsTable.java  |  16 +--
 .../cloudera/impala/catalog/ImpaladCatalog.java |   7 +-
 .../impala/catalog/IncompleteTable.java         |   4 +-
 .../com/cloudera/impala/catalog/KuduTable.java  |   6 +-
 .../impala/catalog/MetaStoreClientPool.java     |  75 ++++++----
 .../java/com/cloudera/impala/catalog/Table.java |   6 +-
 .../cloudera/impala/catalog/TableLoader.java    |   6 +-
 .../java/com/cloudera/impala/catalog/View.java  |   4 +-
 .../impala/service/CatalogOpExecutor.java       | 140 ++++++-------------
 .../com/cloudera/impala/util/MetaStoreUtil.java |   6 +-
 .../cloudera/impala/catalog/CatalogTest.java    |   7 +-
 16 files changed, 124 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java
index 72e30f9..7b59625 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java
@@ -177,8 +177,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
     org.apache.hadoop.hive.metastore.api.Table msTbl =
         CatalogOpExecutor.createMetaStoreTable(createStmt_.toThrift());
 
-    MetaStoreClient client = analyzer.getCatalog().getMetaStoreClient();
-    try {
+    try (MetaStoreClient client = analyzer.getCatalog().getMetaStoreClient()) {
       // Set a valid location of this table using the same rules as the metastore. If the
       // user specified a location for the table this will be a no-op.
       msTbl.getSd().setLocation(analyzer.getCatalog().getTablePath(msTbl).toString());
@@ -199,8 +198,6 @@ public class CreateTableAsSelectStmt extends StatementBase {
       throw new AnalysisException(e.getMessage(), e);
     } catch (Exception e) {
       throw new AnalysisException(e.getMessage(), e);
-    } finally {
-      client.release();
     }
 
     // Finally, run analysis on the insert statement.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java b/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java
index 3858028..4cd1c42 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java
@@ -60,7 +60,7 @@ public abstract class Catalog {
   // Initial catalog version.
   public final static long INITIAL_CATALOG_VERSION = 0L;
   public static final String DEFAULT_DB = "default";
-  private static final int META_STORE_CLIENT_POOL_SIZE = 5;
+  private static final int META_STORE_CLIENT_POOL_SIZE = 10;
 
   public static final String BUILTINS_DB = "_impala_builtins";
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
index 7c7a7de..ae92996 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
@@ -557,8 +557,7 @@ public class CatalogServiceCatalog extends Catalog {
       // step.
       ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
       List<TTableName> tblsToBackgroundLoad = Lists.newArrayList();
-      MetaStoreClient msClient = metaStoreClientPool_.getClient();
-      try {
+      try (MetaStoreClient msClient = getMetaStoreClient()) {
         for (String dbName: msClient.getHiveClient().getAllDatabases()) {
           List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
               Lists.newArrayList();
@@ -592,8 +591,6 @@ public class CatalogServiceCatalog extends Catalog {
             }
           }
         }
-      } finally {
-        msClient.release();
       }
       dbCache_.set(newDbCache);
       // Submit tables for background loading.
@@ -859,8 +856,7 @@ public class CatalogServiceCatalog extends Catalog {
     synchronized(tbl) {
       long newCatalogVersion = incrementAndGetCatalogVersion();
       catalogLock_.writeLock().unlock();
-      MetaStoreClient msClient = getMetaStoreClient();
-      try {
+      try (MetaStoreClient msClient = getMetaStoreClient()) {
         org.apache.hadoop.hive.metastore.api.Table msTbl = null;
         try {
           msTbl = msClient.getHiveClient().getTable(db.getName(),
@@ -870,8 +866,6 @@ public class CatalogServiceCatalog extends Catalog {
               db.getName() + "." + tblName.getTable_name(), e);
         }
         tbl.load(true, msClient.getHiveClient(), msTbl);
-      } finally {
-        msClient.release();
       }
       tbl.setCatalogVersion(newCatalogVersion);
       return tbl;
@@ -954,8 +948,7 @@ public class CatalogServiceCatalog extends Catalog {
     // 3) unknown (null) - There was exception thrown by the metastore client.
     Boolean tableExistsInMetaStore;
     Db db = null;
-    MetaStoreClient msClient = getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Database msDb = null;
       try {
         tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName);
@@ -995,8 +988,6 @@ public class CatalogServiceCatalog extends Catalog {
           return false;
         }
       }
-    } finally {
-      msClient.release();
     }
 
     // Add a new uninitialized table to the table cache, effectively invalidating
@@ -1193,8 +1184,7 @@ public class CatalogServiceCatalog extends Catalog {
           : hdfsPartition.getPartitionName();
       LOG.debug(String.format("Refreshing Partition metadata: %s %s",
           hdfsTable.getFullName(), partitionName));
-      MetaStoreClient msClient = getMetaStoreClient();
-      try {
+      try (MetaStoreClient msClient = getMetaStoreClient()) {
         org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
         try {
           hmsPartition = msClient.getHiveClient().getPartition(
@@ -1212,8 +1202,6 @@ public class CatalogServiceCatalog extends Catalog {
               + hdfsTable.getFullName() + " " + partitionName, e);
         }
         hdfsTable.reloadPartition(hdfsPartition, hmsPartition);
-      } finally {
-        msClient.release();
       }
       hdfsTable.setCatalogVersion(newCatalogVersion);
       return hdfsTable;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java b/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java
index 44bd666..c42c804 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java
@@ -20,7 +20,7 @@ package com.cloudera.impala.catalog;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,7 +139,7 @@ public class DataSourceTable extends Table {
    * Throws a TableLoadingException if the metadata is incompatible with what we
    * support.
    */
-  private void loadColumns(List<FieldSchema> fieldSchemas, HiveMetaStoreClient client)
+  private void loadColumns(List<FieldSchema> fieldSchemas, IMetaStoreClient client)
       throws TableLoadingException {
     int pos = 0;
     for (FieldSchema s: fieldSchemas) {
@@ -159,7 +159,7 @@ public class DataSourceTable extends Table {
   }
 
   @Override
-  public void load(boolean reuseMetadata, HiveMetaStoreClient client,
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     Preconditions.checkNotNull(msTbl);
     msTable_ = msTbl;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java
index a649c93..d96314e 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java
@@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hive.hbase.HBaseSerDe;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -325,7 +325,7 @@ public class HBaseTable extends Table {
    * to hdfs tables since we typically need to understand all columns to make sense
    * of the file at all.
    */
-  public void load(boolean reuseMetadata, HiveMetaStoreClient client,
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     Preconditions.checkNotNull(getMetaStoreTable());
     try {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
index 4b35e66..2464376 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -977,7 +977,7 @@ public class HdfsTable extends Table {
   }
 
   @Override
-  public void load(boolean reuseMetadata, HiveMetaStoreClient client,
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     load(reuseMetadata, client, msTbl, true, true, null);
   }
@@ -1009,7 +1009,7 @@ public class HdfsTable extends Table {
    * If any of these occur, user has to execute "invalidate metadata" to invalidate the
    * metadata cache of the table and trigger a fresh load.
    */
-  public void load(boolean reuseMetadata, HiveMetaStoreClient client,
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl, boolean loadFileMetadata,
       boolean loadTableSchema, Set<String> partitionsToUpdate)
       throws TableLoadingException {
@@ -1098,7 +1098,7 @@ public class HdfsTable extends Table {
    * file/block metadata reload for the partitions specified in 'partitionsToUpdate', if
    * any, or for all the table partitions if 'partitionsToUpdate' is null.
    */
-  private void updatePartitionsFromHms(HiveMetaStoreClient client,
+  private void updatePartitionsFromHms(IMetaStoreClient client,
       Set<String> partitionsToUpdate, boolean loadFileMetadata) throws Exception {
     LOG.debug("sync table partitions: " + name_);
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
@@ -1262,7 +1262,7 @@ public class HdfsTable extends Table {
    * as Avro. Additionally, this method also reconciles the schema if the column
    * definitions from the metastore differ from the Avro schema.
    */
-  private void setAvroSchema(HiveMetaStoreClient client,
+  private void setAvroSchema(IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
     Preconditions.checkState(isSchemaLoaded_);
     String inputFormat = msTbl.getSd().getInputFormat();
@@ -1324,7 +1324,7 @@ public class HdfsTable extends Table {
   /**
    * Loads table schema and column stats from Hive Metastore.
    */
-  private void loadSchema(HiveMetaStoreClient client,
+  private void loadSchema(IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
     nonPartFieldSchemas_.clear();
     // set nullPartitionKeyValue from the hive conf.
@@ -1356,7 +1356,7 @@ public class HdfsTable extends Table {
    * table partitions.
    */
   private void loadPartitionsFromMetastore(List<HdfsPartition> partitions,
-      HiveMetaStoreClient client) throws Exception {
+      IMetaStoreClient client) throws Exception {
     Preconditions.checkNotNull(partitions);
     if (partitions.isEmpty()) return;
     LOG.info(String.format("Incrementally updating %d/%d partitions.",
@@ -1373,7 +1373,7 @@ public class HdfsTable extends Table {
    * 'partitionNames' and adds them to the internal list of table partitions.
    */
   private void loadPartitionsFromMetastore(Set<String> partitionNames,
-      HiveMetaStoreClient client) throws Exception {
+      IMetaStoreClient client) throws Exception {
     Preconditions.checkNotNull(partitionNames);
     if (partitionNames.isEmpty()) return;
     // Load partition metadata from Hive Metastore.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java
index b8c3334..02c9747 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java
@@ -207,19 +207,16 @@ public class ImpaladCatalog extends Catalog {
    */
   public Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl)
       throws NoSuchObjectException, MetaException, TException {
-    MetaStoreClient client = getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = getMetaStoreClient()) {
       // If the table did not have its path set, build the path based on the the
       // location property of the parent database.
       if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) {
         String dbLocation =
-            client.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri();
+            msClient.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri();
         return new Path(dbLocation, msTbl.getTableName().toLowerCase());
       } else {
         return new Path(msTbl.getSd().getLocation());
       }
-    } finally {
-      client.release();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java b/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java
index e811046..88bab5e 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java
@@ -20,7 +20,7 @@ package com.cloudera.impala.catalog;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 
 import com.cloudera.impala.common.ImpalaException;
 import com.cloudera.impala.common.JniUtil;
@@ -70,7 +70,7 @@ public class IncompleteTable extends Table {
   }
 
   @Override
-  public void load(boolean reuseMetadata, HiveMetaStoreClient client,
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     if (cause_ instanceof TableLoadingException) {
       throw (TableLoadingException) cause_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java b/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java
index e60c639..28f4133 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java
@@ -24,7 +24,7 @@ import java.util.Set;
 
 import javax.xml.bind.DatatypeConverter;
 
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.log4j.Logger;
@@ -133,7 +133,7 @@ public class KuduTable extends Table {
   /**
    * Load the columns from the schema list
    */
-  private void loadColumns(List<FieldSchema> schema, HiveMetaStoreClient client,
+  private void loadColumns(List<FieldSchema> schema, IMetaStoreClient client,
       Set<String> keyColumns) throws TableLoadingException {
 
     if (keyColumns.size() == 0 || keyColumns.size() > schema.size()) {
@@ -168,7 +168,7 @@ public class KuduTable extends Table {
   }
 
   @Override
-  public void load(boolean reuseMetadata, HiveMetaStoreClient client,
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     // TODO handle 'reuseMetadata'
     if (getMetaStoreTable() == null || !tableParamsAreValid(msTbl.getParameters())) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java b/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java
index 8ea96fe..40eb4cf 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java
@@ -20,20 +20,28 @@ package com.cloudera.impala.catalog;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Preconditions;
 
 /**
- * Manages a pool of HiveMetaStoreClient connections. If the connection pool is empty
- * a new client is created and added to the pool. There is no size limit.
+ * Manages a pool of RetryingMetaStoreClient connections. If the connection pool is empty
+ * a new client is created and added to the pool. The idle pool can expand till a maximum
+ * size of MAX_HMS_CONNECTION_POOL_SIZE, beyond which the connections are closed.
  */
 public class MetaStoreClientPool {
   // Key for config option read from hive-site.xml
   private static final String HIVE_METASTORE_CNXN_DELAY_MS_CONF =
       "impala.catalog.metastore.cnxn.creation.delay.ms";
   private static final int DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF = 0;
+  // Maximum number of idle metastore connections in the connection pool at any point.
+  private static final int MAX_HMS_CONNECTION_POOL_SIZE = 32;
   // Number of milliseconds to sleep between creation of HMS connections. Used to debug
   // IMPALA-825.
   private final int clientCreationDelayMs_;
@@ -46,29 +54,40 @@ public class MetaStoreClientPool {
   private final Object poolCloseLock_ = new Object();
   private final HiveConf hiveConf_;
 
+  // Required for creating an instance of RetryingMetaStoreClient.
+  private static final HiveMetaHookLoader dummyHookLoader = new HiveMetaHookLoader() {
+    @Override
+    public HiveMetaHook getHook(org.apache.hadoop.hive.metastore.api.Table tbl)
+        throws MetaException {
+      return null;
+    }
+  };
+
   /**
-   * A wrapper around the HiveMetaStoreClient that manages interactions with the
-   * connection pool.
+   * A wrapper around the RetryingMetaStoreClient that manages interactions with the
+   * connection pool. This implements the AutoCloseable interface and hence the callers
+   * should use the try-with-resources statement while creating an instance.
    */
-  public class MetaStoreClient {
-    private final HiveMetaStoreClient hiveClient_;
+  public class MetaStoreClient implements AutoCloseable {
+    private final IMetaStoreClient hiveClient_;
     private boolean isInUse_;
 
     private MetaStoreClient(HiveConf hiveConf) {
       try {
         LOG.debug("Creating MetaStoreClient. Pool Size = " + clientPool_.size());
-        this.hiveClient_ = new HiveMetaStoreClient(hiveConf);
+        hiveClient_ = RetryingMetaStoreClient.getProxy(hiveConf, dummyHookLoader,
+            HiveMetaStoreClient.class.getName());
       } catch (Exception e) {
         // Turn in to an unchecked exception
         throw new IllegalStateException(e);
       }
-      this.isInUse_ = false;
+      isInUse_ = false;
     }
 
     /**
-     * Returns the internal HiveMetaStoreClient object.
+     * Returns the internal RetryingMetaStoreClient object.
      */
-    public HiveMetaStoreClient getHiveClient() {
+    public IMetaStoreClient getHiveClient() {
       return hiveClient_;
     }
 
@@ -76,27 +95,26 @@ public class MetaStoreClientPool {
      * Returns this client back to the connection pool. If the connection pool has been
      * closed, just close the Hive client connection.
      */
-    public void release() {
+    @Override
+    public void close() {
       Preconditions.checkState(isInUse_);
       isInUse_ = false;
-      // Ensure the connection isn't returned to the pool if the pool has been closed.
+      // Ensure the connection isn't returned to the pool if the pool has been closed
+      // or if the number of connections in the pool exceeds MAX_HMS_CONNECTION_POOL_SIZE.
       // This lock is needed to ensure proper behavior when a thread reads poolClosed
       // is false, but a call to pool.close() comes in immediately afterward.
       synchronized (poolCloseLock_) {
-        if (poolClosed_) {
+        if (poolClosed_ || clientPool_.size() >= MAX_HMS_CONNECTION_POOL_SIZE) {
           hiveClient_.close();
         } else {
-          // TODO: Currently the pool does not work properly because we cannot
-          // reuse MetastoreClient connections. No reason to add this client back
-          // to the pool. See HIVE-5181.
-          // clientPool.add(this);
-          hiveClient_.close();
+          clientPool_.offer(this);
         }
       }
     }
 
     // Marks this client as in use
     private void markInUse() {
+      Preconditions.checkState(!isInUse_);
       isInUse_ = true;
     }
   }
@@ -106,7 +124,7 @@ public class MetaStoreClientPool {
   }
 
   public MetaStoreClientPool(int initialSize, HiveConf hiveConf) {
-    this.hiveConf_ = hiveConf;
+    hiveConf_ = hiveConf;
     clientCreationDelayMs_ = hiveConf_.getInt(HIVE_METASTORE_CNXN_DELAY_MS_CONF,
         DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF);
     addClients(initialSize);
@@ -138,18 +156,13 @@ public class MetaStoreClientPool {
     // The pool was empty so create a new client and return that.
     // Serialize client creation to defend against possible race conditions accessing
     // local Kerberos state (see IMPALA-825).
-    synchronized (this) {
-      try {
-        Thread.sleep(clientCreationDelayMs_);
-      } catch (InterruptedException e) {
-        /* ignore */
-      }
-      if (client == null) {
-        client = new MetaStoreClient(hiveConf_);
-      } else {
-        // TODO: Due to Hive Metastore bugs, there is leftover state from previous client
-        // connections so we are unable to reuse the same connection. For now simply
-        // reconnect each time. One possible culprit is HIVE-5181.
+    if (client == null) {
+      synchronized (this) {
+        try {
+          Thread.sleep(clientCreationDelayMs_);
+        } catch (InterruptedException e) {
+          /* ignore */
+        }
         client = new MetaStoreClient(hiveConf_);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Table.java b/fe/src/main/java/com/cloudera/impala/catalog/Table.java
index 87f8970..8ac2499 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/Table.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/Table.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -113,7 +113,7 @@ public abstract class Table implements CatalogObject {
    * Populate members of 'this' from metastore info. If 'reuseMetadata' is true, reuse
    * valid existing metadata.
    */
-  public abstract void load(boolean reuseMetadata, HiveMetaStoreClient client,
+  public abstract void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException;
 
   public void addColumn(Column col) {
@@ -157,7 +157,7 @@ public abstract class Table implements CatalogObject {
    * errors are logged and ignored, since the absence of column stats is not critical to
    * the correctness of the system.
    */
-  protected void loadAllColumnStats(HiveMetaStoreClient client) {
+  protected void loadAllColumnStats(IMetaStoreClient client) {
     LOG.debug("Loading column stats for table: " + name_);
     List<ColumnStatisticsObj> colStats;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java b/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java
index f935c0b..78b58f6 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java
@@ -57,11 +57,9 @@ public class TableLoader {
   public Table load(Db db, String tblName) {
     String fullTblName = db.getName() + "." + tblName;
     LOG.info("Loading metadata for: " + fullTblName);
-    MetaStoreClient msClient = null;
     Table table;
     // turn all exceptions into TableLoadingException
-    try {
-      msClient = catalog_.getMetaStoreClient();
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTbl = null;
       // All calls to getTable() need to be serialized due to HIVE-5457.
       synchronized (metastoreAccessLock_) {
@@ -97,8 +95,6 @@ public class TableLoader {
           catalog_.getNextTableId(), db, tblName, new TableLoadingException(
           "Failed to load metadata for table: " + fullTblName + ". Running " +
           "'invalidate metadata " + fullTblName + "' may resolve this problem.", e));
-    } finally {
-      if (msClient != null) msClient.release();
     }
     return table;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/View.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/View.java b/fe/src/main/java/com/cloudera/impala/catalog/View.java
index f062172..cc82f95 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/View.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/View.java
@@ -21,7 +21,7 @@ import java.io.StringReader;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 
 import com.cloudera.impala.analysis.ParseNode;
@@ -99,7 +99,7 @@ public class View extends Table {
   }
 
   @Override
-  public void load(boolean reuseMetadata, HiveMetaStoreClient client,
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     try {
       clearColumns();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java b/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
index ec7b6ca..4814503 100644
--- a/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
@@ -470,8 +470,7 @@ public class CatalogOpExecutor {
       boolean reloadFileMetadata, boolean reloadTableSchema,
       Set<String> partitionsToUpdate) throws CatalogException {
     Preconditions.checkState(Thread.holdsLock(tbl));
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
           getMetaStoreTable(msClient, tbl);
       if (tbl instanceof HdfsTable) {
@@ -480,8 +479,6 @@ public class CatalogOpExecutor {
       } else {
         tbl.load(true, msClient.getHiveClient(), msTbl);
       }
-    } finally {
-      msClient.release();
     }
     tbl.setCatalogVersion(newCatalogVersion);
   }
@@ -544,7 +541,9 @@ public class CatalogOpExecutor {
       setViewAttributes(params, msTbl);
       LOG.debug(String.format("Altering view %s", tableName));
       applyAlterTable(msTbl);
-      tbl.load(true, catalog_.getMetaStoreClient().getHiveClient(), msTbl);
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        tbl.load(true, msClient.getHiveClient(), msTbl);
+      }
       tbl.setCatalogVersion(newCatalogVersion);
       addTableToCatalogUpdate(tbl, resp.result);
     }
@@ -583,10 +582,9 @@ public class CatalogOpExecutor {
       }
     }
 
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
     int numTargetedPartitions = 0;
     int numUpdatedColumns = 0;
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Update the table and partition row counts based on the query results.
       List<HdfsPartition> modifiedParts = Lists.newArrayList();
       if (params.isSetTable_stats()) {
@@ -616,8 +614,6 @@ public class CatalogOpExecutor {
       // Update the table stats. Apply the table alteration last to ensure the
       // lastDdlTime is as accurate as possible.
       applyAlterTable(msTbl);
-    } finally {
-      msClient.release();
     }
 
     // Set the results to be reported to the client.
@@ -812,33 +808,32 @@ public class CatalogOpExecutor {
     LOG.debug("Creating database " + dbName);
     Db newDb = null;
     synchronized (metastoreDdlLock_) {
-      MetaStoreClient msClient = catalog_.getMetaStoreClient();
-      try {
-        msClient.getHiveClient().createDatabase(db);
-        newDb = catalog_.addDb(dbName, db);
-      } catch (AlreadyExistsException e) {
-        if (!params.if_not_exists) {
-          throw new ImpalaRuntimeException(
-              String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e);
-        }
-        LOG.debug(String.format("Ignoring '%s' when creating database %s because " +
-            "IF NOT EXISTS was specified.", e, dbName));
-        newDb = catalog_.getDb(dbName);
-        if (newDb == null) {
-          try {
-            org.apache.hadoop.hive.metastore.api.Database msDb =
-                msClient.getHiveClient().getDatabase(dbName);
-            newDb = catalog_.addDb(dbName, msDb);
-          } catch (TException e1) {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        try {
+          msClient.getHiveClient().createDatabase(db);
+          newDb = catalog_.addDb(dbName, db);
+        } catch (AlreadyExistsException e) {
+          if (!params.if_not_exists) {
             throw new ImpalaRuntimeException(
-                String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e1);
+                String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e);
           }
+          LOG.debug(String.format("Ignoring '%s' when creating database %s because " +
+              "IF NOT EXISTS was specified.", e, dbName));
+          newDb = catalog_.getDb(dbName);
+          if (newDb == null) {
+            try {
+              org.apache.hadoop.hive.metastore.api.Database msDb =
+                  msClient.getHiveClient().getDatabase(dbName);
+              newDb = catalog_.addDb(dbName, msDb);
+            } catch (TException e1) {
+              throw new ImpalaRuntimeException(
+                  String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e1);
+            }
+          }
+        } catch (TException e) {
+          throw new ImpalaRuntimeException(
+              String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e);
         }
-      } catch (TException e) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e);
-      } finally {
-        msClient.release();
       }
 
       Preconditions.checkNotNull(newDb);
@@ -1031,8 +1026,7 @@ public class CatalogOpExecutor {
   private int dropColumnStats(Table table) throws ImpalaRuntimeException {
     Preconditions.checkState(Thread.holdsLock(table));
     int numColsUpdated = 0;
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       for (Column col: table.getColumns()) {
         // Skip columns that don't have stats.
         if (!col.getStats().hasStats()) continue;
@@ -1051,8 +1045,6 @@ public class CatalogOpExecutor {
                   "delete_table_column_statistics"), e);
         }
       }
-    } finally {
-      msClient.release();
     }
     return numColsUpdated;
   }
@@ -1127,16 +1119,13 @@ public class CatalogOpExecutor {
     }
 
     TCatalogObject removedObject = new TCatalogObject();
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
     synchronized (metastoreDdlLock_) {
-      try {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
         msClient.getHiveClient().dropDatabase(
             params.getDb(), true, params.if_exists, params.cascade);
       } catch (TException e) {
         throw new ImpalaRuntimeException(
             String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDatabase"), e);
-      } finally {
-        msClient.release();
       }
       Db removedDb = catalog_.removeDb(params.getDb());
       // If no db was removed as part of this operation just return the current catalog
@@ -1179,7 +1168,6 @@ public class CatalogOpExecutor {
         // Do nothing
       }
 
-      MetaStoreClient msClient = catalog_.getMetaStoreClient();
       Db db = catalog_.getDb(params.getTable_name().db_name);
       if (db == null) {
         if (params.if_exists) return;
@@ -1203,14 +1191,12 @@ public class CatalogOpExecutor {
             "not allowed on a " + (params.is_table ? "view: " : "table: ") + tableName;
         throw new CatalogException(errorMsg);
       }
-      try {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
         msClient.getHiveClient().dropTable(
             tableName.getDb(), tableName.getTbl(), true, params.if_exists, params.purge);
       } catch (TException e) {
         throw new ImpalaRuntimeException(
             String.format(HMS_RPC_ERROR_FORMAT_STR, "dropTable"), e);
-      } finally {
-        msClient.release();
       }
 
       Table table = catalog_.removeTable(params.getTable_name().db_name,
@@ -1499,10 +1485,9 @@ public class CatalogOpExecutor {
       boolean ifNotExists, THdfsCachingOp cacheOp, List<TDistributeParam> distribute_by,
       TDdlExecResponse response)
       throws ImpalaException {
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
     synchronized (metastoreDdlLock_) {
 
-      try {
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
         msClient.getHiveClient().createTable(newTable);
         // If this table should be cached, and the table location was not specified by
         // the user, an extra step is needed to read the table to find the location.
@@ -1523,8 +1508,6 @@ public class CatalogOpExecutor {
       } catch (TException e) {
         throw new ImpalaRuntimeException(
             String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
-      } finally {
-        msClient.release();
       }
 
       // Forward the operation to a specific storage backend. If the operation fails,
@@ -1532,15 +1515,12 @@ public class CatalogOpExecutor {
       try {
         createDdlDelegate(newTable).setDistributeParams(distribute_by).createTable();
       } catch (ImpalaRuntimeException e) {
-        MetaStoreClient c = catalog_.getMetaStoreClient();
-        try {
+        try (MetaStoreClient c = catalog_.getMetaStoreClient()) {
           c.getHiveClient().dropTable(newTable.getDbName(), newTable.getTableName(),
               false, ifNotExists);
         } catch (Exception hE) {
           throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
               "dropTable"), hE);
-        } finally {
-          c.release();
         }
         throw e;
       }
@@ -1672,10 +1652,9 @@ public class CatalogOpExecutor {
     Long parentTblCacheDirId =
         HdfsCachingUtil.getCacheDirectiveId(msTbl.getParameters());
 
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
     partition = createHmsPartition(partitionSpec, msTbl, tableName, location);
 
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Add the new partition.
       partition = msClient.getHiveClient().add_partition(partition);
       String cachePoolName = null;
@@ -1721,8 +1700,6 @@ public class CatalogOpExecutor {
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
-    } finally {
-      msClient.release();
     }
     if (cacheIds != null) catalog_.watchCacheDirs(cacheIds, tableName.toThrift());
     // Return the table object with an updated catalog version after creating the
@@ -1764,10 +1741,9 @@ public class CatalogOpExecutor {
         }
       }
     }
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
     PartitionDropOptions dropOptions = PartitionDropOptions.instance();
     dropOptions.purgeData(purge);
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       msClient.getHiveClient().dropPartition(tableName.getDb(),
           tableName.getTbl(), values, dropOptions);
       updateLastDdlTime(msTbl, msClient);
@@ -1784,8 +1760,6 @@ public class CatalogOpExecutor {
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "dropPartition"), e);
-    } finally {
-      msClient.release();
     }
     return catalog_.dropPartition(tbl, partitionSpec);
   }
@@ -1827,8 +1801,7 @@ public class CatalogOpExecutor {
         oldTbl.getMetaStoreTable().deepCopy();
     msTbl.setDbName(newTableName.getDb());
     msTbl.setTableName(newTableName.getTbl());
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Workaround for HIVE-9720/IMPALA-1711: When renaming a table with column
       // stats across databases, we save, drop and restore the column stats because
       // the HMS does not properly move them to the new table via alteration.
@@ -1864,8 +1837,6 @@ public class CatalogOpExecutor {
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
-    } finally {
-      msClient.release();
     }
     // Rename the table in the Catalog and get the resulting catalog object.
     // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
@@ -2222,8 +2193,7 @@ public class CatalogOpExecutor {
     }
 
     // Add partitions to metastore.
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // ifNotExists and needResults are true.
       hmsPartitions = msClient.getHiveClient().add_partitions(hmsPartitions,
           true, true);
@@ -2252,8 +2222,6 @@ public class CatalogOpExecutor {
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
-    } finally {
-      msClient.release();
     }
 
     if (!cacheIds.isEmpty()) {
@@ -2305,8 +2273,7 @@ public class CatalogOpExecutor {
   public boolean addJavaFunctionToHms(String db,
       org.apache.hadoop.hive.metastore.api.Function fn, boolean ifNotExists)
       throws ImpalaRuntimeException{
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       msClient.getHiveClient().createFunction(fn);
     } catch(AlreadyExistsException e) {
       if (!ifNotExists) {
@@ -2319,8 +2286,6 @@ public class CatalogOpExecutor {
           fn.getFunctionName(), e);
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "createFunction"), e);
-    } finally {
-      msClient.release();
     }
     return true;
   }
@@ -2331,8 +2296,7 @@ public class CatalogOpExecutor {
    */
   public boolean dropJavaFunctionFromHms(String db, String fn, boolean ifExists)
       throws ImpalaRuntimeException {
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       msClient.getHiveClient().dropFunction(db, fn);
     } catch (NoSuchObjectException e) {
       if (!ifExists) {
@@ -2344,8 +2308,6 @@ public class CatalogOpExecutor {
       LOG.error("Error executing dropFunction() metastore call: " + fn, e);
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "dropFunction"), e);
-    } finally {
-      msClient.release();
     }
     return true;
   }
@@ -2355,14 +2317,11 @@ public class CatalogOpExecutor {
    */
   private void applyAlterDatabase(Db db)
       throws ImpalaRuntimeException {
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       msClient.getHiveClient().alterDatabase(db.getName(), db.getMetaStoreDb());
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "alterDatabase"), e);
-    } finally {
-      msClient.release();
     }
   }
 
@@ -2376,9 +2335,8 @@ public class CatalogOpExecutor {
    */
   private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
       throws ImpalaRuntimeException {
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
     long lastDdlTime = -1;
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       lastDdlTime = calculateDdlTime(msTbl);
       msTbl.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
       msClient.getHiveClient().alter_table(
@@ -2387,7 +2345,6 @@ public class CatalogOpExecutor {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
     } finally {
-      msClient.release();
       catalog_.updateLastDdlTime(
           new TTableName(msTbl.getDbName(), msTbl.getTableName()), lastDdlTime);
     }
@@ -2395,8 +2352,7 @@ public class CatalogOpExecutor {
 
   private void applyAlterPartition(Table tbl, HdfsPartition partition)
       throws ImpalaException {
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       TableName tableName = tbl.getTableName();
       msClient.getHiveClient().alter_partition(
           tableName.getDb(), tableName.getTbl(), partition.toHmsPartition());
@@ -2406,8 +2362,6 @@ public class CatalogOpExecutor {
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partition"), e);
-    } finally {
-      msClient.release();
     }
   }
 
@@ -2549,8 +2503,7 @@ public class CatalogOpExecutor {
     }
     if (hmsPartitions.size() == 0) return;
 
-    MetaStoreClient msClient = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
       for (int i = 0; i < hmsPartitions.size(); i += MAX_PARTITION_UPDATES_PER_RPC) {
         int numPartitionsToUpdate =
@@ -2575,8 +2528,6 @@ public class CatalogOpExecutor {
               String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partitions"), e);
         }
       }
-    } finally {
-      msClient.release();
     }
   }
 
@@ -2879,8 +2830,7 @@ public class CatalogOpExecutor {
         }
 
         if (!partsToCreate.isEmpty()) {
-          MetaStoreClient msClient = catalog_.getMetaStoreClient();
-          try {
+          try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
             org.apache.hadoop.hive.metastore.api.Table msTbl =
                 table.getMetaStoreTable().deepCopy();
             List<org.apache.hadoop.hive.metastore.api.Partition> hmsParts =
@@ -2958,8 +2908,6 @@ public class CatalogOpExecutor {
                 "AlreadyExistsException thrown although ifNotExists given", e);
           } catch (Exception e) {
             throw new InternalException("Error adding partitions", e);
-          } finally {
-            msClient.release();
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java b/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java
index d3618f0..540c749 100644
--- a/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java
@@ -21,7 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.log4j.Logger;
@@ -93,7 +93,7 @@ public class MetaStoreUtil {
    * configuring retires at the connection level so it can be enabled independently.
    */
   public static List<org.apache.hadoop.hive.metastore.api.Partition> fetchAllPartitions(
-      HiveMetaStoreClient client, String dbName, String tblName, int numRetries)
+      IMetaStoreClient client, String dbName, String tblName, int numRetries)
       throws MetaException, TException {
     Preconditions.checkArgument(numRetries >= 0);
     int retryAttempt = 0;
@@ -124,7 +124,7 @@ public class MetaStoreUtil {
    * Will throw a MetaException if any partitions in 'partNames' do not exist.
    */
   public static List<Partition> fetchPartitionsByName(
-      HiveMetaStoreClient client, List<String> partNames, String dbName, String tblName)
+      IMetaStoreClient client, List<String> partNames, String dbName, String tblName)
       throws MetaException, TException {
     LOG.trace(String.format("Fetching %d partitions for: %s.%s using partition " +
         "batch size: %d", partNames.size(), dbName, tblName, maxPartitionsPerRpc_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java b/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java
index a55da33..9ed09ed 100644
--- a/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java
@@ -420,8 +420,7 @@ public class CatalogTest {
 
     // Now attempt to update a column's stats with mismatched stats data and ensure
     // we get the expected results.
-    MetaStoreClient client = catalog_.getMetaStoreClient();
-    try {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
       // Load some string stats data and use it to update the stats of different
       // typed columns.
       ColumnStatisticsData stringColStatsData = client.getHiveClient()
@@ -447,10 +446,6 @@ public class CatalogTest {
       // Now try to apply a matching column stats data and ensure it succeeds.
       assertTrue(table.getColumn("string_col").updateStats(stringColStatsData));
       assertEquals(1178, table.getColumn("string_col").getStats().getNumDistinctValues());
-    } finally {
-      // Make sure to invalidate the metadata so the next test isn't using bad col stats
-      //catalog_.refreshTable("functional", "alltypesagg", false);
-      client.release();
     }
   }
 


[3/3] incubator-impala git commit: IMPALA-3953: Fixes for KuduScanNode BE test failure

Posted by mj...@apache.org.
IMPALA-3953: Fixes for KuduScanNode BE test failure

After a previous fix for IMPALA-3857, KuduScanNodeTest
TestLimitsAreEnforced (BE test) occasionally throws when a
scanner thread takes a lock_ that isn't valid, crashing the
process.

It looks like the issue is likely that TestScanEmptyString
isn't closing its KuduScanNode, and a lingering
ScannerThread may end up touching invalid memory later.

This fixes the test case and also:
1) Adds some missing synchronization in KuduScanNode which
   was found in the process of investigating this issue.
2) Adds a DCHECK on ~KuduScanNode() to ensure it was closed.

This was tested by running KuduScanNodeTest in a loop for 5
hours. Without the fix, the failure was produced within
several hours.

Change-Id: I16be206c60a692d2a26d719de8cc45e859b06e97
Reviewed-on: http://gerrit.cloudera.org:8080/3888
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6fc399eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6fc399eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6fc399eb

Branch: refs/heads/master
Commit: 6fc399ebc435121cdb7865ff4987aca1c95af5fc
Parents: f4da925
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Wed Aug 10 13:35:52 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Mon Aug 15 23:41:08 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scan-node-test.cc | 1 +
 be/src/exec/kudu-scan-node.cc      | 3 +++
 2 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6fc399eb/be/src/exec/kudu-scan-node-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-test.cc b/be/src/exec/kudu-scan-node-test.cc
index 7719a75..0cda0ec 100644
--- a/be/src/exec/kudu-scan-node-test.cc
+++ b/be/src/exec/kudu-scan-node-test.cc
@@ -499,6 +499,7 @@ TEST_F(KuduScanNodeTest, TestScanEmptyString) {
   ASSERT_OK(scanner.GetNext(runtime_state_.get(), NULL, &eos));
   ASSERT_TRUE(eos);
   ASSERT_EQ(PrintBatch(batch), "[(10 null )]\n");
+  scanner.Close(runtime_state_.get());
 }
 
 // Test that scan limits are enforced.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6fc399eb/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 74011b1..a7a068c 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -82,6 +82,7 @@ KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode,
 }
 
 KuduScanNode::~KuduScanNode() {
+  DCHECK(is_closed());
   STLDeleteElements(&kudu_predicates_);
 }
 
@@ -184,6 +185,8 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
       num_rows_returned_ -= num_rows_over;
       COUNTER_SET(rows_returned_counter_, num_rows_returned_);
       *eos = true;
+
+      unique_lock<mutex> l(lock_);
       done_ = true;
       materialized_row_batches_->Shutdown();
     }


[2/3] incubator-impala git commit: IMPALA-3936: BufferedBlockMgr fixes for Pin() while write in flight.

Posted by mj...@apache.org.
IMPALA-3936: BufferedBlockMgr fixes for Pin() while write in flight.

Fix multiple bugs that could occur when a block was unpinned then pinned
again while the write was in flight. There were two problems:

1. A block's buffer could be transferred while a write is in flight,
  leaving the block in an invalid state. The fix is to wait for
  the in-flight write to complete before transferring the buffer.
2. On certain code paths in WriteComplete(), condition variables weren't
  signalled, leading to threads waiting for write completion not being
  woken up. The fix is to clarify when condition variables will be
  signalled and ensure that the appropriate condition variables are
  always signalled when the write completes.

Testing:
Added a targeted unit test that exercises these code paths using a
debug option that controls timing of writes.

Reran the stress test configuration that reproducibly triggered the
bug: TPC-H query 18 on a release build with a single impalad.
It succeeded.

Change-Id: I4be4fad8e6f2303db19ea1e2bd0f13523781ae8e
Reviewed-on: http://gerrit.cloudera.org:8080/3832
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f4da9251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f4da9251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f4da9251

Branch: refs/heads/master
Commit: f4da9251346129189806a969288f2f7c4532bbe5
Parents: 15d20d8
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 28 23:18:24 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Mon Aug 15 23:33:57 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-block-mgr-test.cc | 56 +++++++++++++++++++++++
 be/src/runtime/buffered-block-mgr.cc      | 63 +++++++++++++++++---------
 be/src/runtime/buffered-block-mgr.h       | 29 +++++++++---
 3 files changed, 121 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f4da9251/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index bdef7cd..5b5ee8a 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -774,6 +774,62 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   TearDownMgrs();
 }
 
+// This exercises a code path where:
+// 1. A block A is unpinned.
+// 2. A block B is unpinned.
+// 3. A write for block A is initiated.
+// 4. Block A is pinned.
+// 5. Block B is pinned, with block A passed in to be deleted.
+//    Block A's buffer will be transferred to block B.
+// 6. The write for block A completes.
+// Previously there was a bug (IMPALA-3936) where the buffer transfer happened before the
+// write completed. There were also various hangs related to missing condition variable
+// notifications.
+TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) {
+  const int trials = 5;
+  const int max_num_buffers = 2;
+  BufferedBlockMgr::Client* client;
+  RuntimeState* query_state;
+  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
+      1, false, client_tracker_.get(), &client, &query_state);
+
+  for (int trial = 0; trial < trials; ++trial) {
+    for (int delay_ms = 0; delay_ms <= 10; delay_ms += 5) {
+      // Force writes to be delayed to enlarge window of opportunity for bug.
+      block_mgr->set_debug_write_delay_ms(delay_ms);
+      vector<BufferedBlockMgr::Block*> blocks;
+      AllocateBlocks(block_mgr, client, 2, &blocks);
+
+      // Force the second block to be written and have its buffer freed.
+      // We only have one buffer to share between the first and second blocks now.
+      ASSERT_OK(blocks[1]->Unpin());
+
+      // Create another client. Reserving different numbers of buffers can send it
+      // down different code paths because the original client is entitled to different
+      // number of buffers.
+      int reserved_buffers = trial % max_num_buffers;
+      BufferedBlockMgr::Client* tmp_client;
+      EXPECT_TRUE(block_mgr->RegisterClient("tmp_client", reserved_buffers, false,
+          client_tracker_.get(), query_state, &tmp_client).ok());
+      BufferedBlockMgr::Block* tmp_block;
+      ASSERT_OK(block_mgr->GetNewBlock(tmp_client, NULL, &tmp_block));
+
+      // Initiate the write, repin the block, then immediately try to swap the buffer to
+      // the second block while the write is still in flight.
+      ASSERT_OK(blocks[0]->Unpin());
+      bool pinned;
+      ASSERT_OK(blocks[0]->Pin(&pinned));
+      ASSERT_TRUE(pinned);
+      ASSERT_OK(blocks[1]->Pin(&pinned, blocks[0], false));
+      ASSERT_TRUE(pinned);
+
+      blocks[1]->Delete();
+      tmp_block->Delete();
+      block_mgr->ClearReservations(tmp_client);
+    }
+  }
+}
+
 // Test that all APIs return cancelled after close.
 TEST_F(BufferedBlockMgrTest, Close) {
   int max_num_buffers = 5;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f4da9251/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index 0fc617c..db62922 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -224,7 +224,8 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr
     is_cancelled_(false),
     writes_issued_(0),
     encryption_(FLAGS_disk_spill_encryption),
-    check_integrity_(FLAGS_disk_spill_encryption) {
+    check_integrity_(FLAGS_disk_spill_encryption),
+    debug_write_delay_ms_(0) {
 }
 
 Status BufferedBlockMgr::Create(RuntimeState* state, MemTracker* parent,
@@ -490,14 +491,17 @@ Status BufferedBlockMgr::TransferBuffer(Block* dst, Block* src, bool unpin) {
   DCHECK(src != NULL);
   unique_lock<mutex> lock(lock_);
 
-  // First write out the src block.
   DCHECK(src->is_pinned_);
   DCHECK(!dst->is_pinned_);
   DCHECK(dst->buffer_desc_ == NULL);
   DCHECK_EQ(src->buffer_desc_->len, max_block_size_);
+
+  // Ensure that there aren't any writes in flight for 'src'.
+  WaitForWrite(lock, src);
   src->is_pinned_ = false;
 
   if (unpin) {
+    // First write out the src block so we can grab its buffer.
     src->client_local_ = true;
     status = WriteUnpinnedBlock(src);
     if (!status.ok()) {
@@ -506,9 +510,7 @@ Status BufferedBlockMgr::TransferBuffer(Block* dst, Block* src, bool unpin) {
       return status;
     }
     // Wait for the write to complete.
-    while (src->in_write_ && !is_cancelled_) {
-      src->write_complete_cv_.wait(lock);
-    }
+    WaitForWrite(lock, src);
     if (is_cancelled_) {
       // We can't be sure the write succeeded, so return the buffer to src.
       src->is_pinned_ = true;
@@ -810,6 +812,13 @@ Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) {
   return Status::OK();
 }
 
+void BufferedBlockMgr::WaitForWrite(unique_lock<mutex>& lock, Block* block) {
+  DCHECK(!block->is_deleted_);
+  while (block->in_write_ && !is_cancelled_) {
+    block->write_complete_cv_.wait(lock);
+  }
+}
+
 Status BufferedBlockMgr::AllocateScratchSpace(int64_t block_size,
     TmpFileMgr::File** tmp_file, int64_t* file_offset) {
   // Assumes block manager lock is already taken.
@@ -837,12 +846,18 @@ Status BufferedBlockMgr::AllocateScratchSpace(int64_t block_size,
 }
 
 void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) {
+#ifndef NDEBUG
+  if (debug_write_delay_ms_ > 0) {
+    usleep(static_cast<int64_t>(debug_write_delay_ms_) * 1000);
+  }
+#endif
   Status status = Status::OK();
   lock_guard<mutex> lock(lock_);
   outstanding_writes_counter_->Add(-1);
   DCHECK(Validate()) << endl << DebugInternal();
   DCHECK(is_cancelled_ || block->in_write_) << "WriteComplete() for block not in write."
                                             << endl << block->DebugString();
+  DCHECK(block->buffer_desc_ != NULL);
   if (!block->client_local_) {
     DCHECK_GT(non_local_outstanding_writes_, 0) << block->DebugString();
     --non_local_outstanding_writes_;
@@ -872,24 +887,11 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) {
   } else if (block->client_local_) {
     DCHECK(!block->is_deleted_)
         << "Client should be waiting. No one should have deleted this block.";
-    block->write_complete_cv_.notify_one();
   } else {
     DCHECK_EQ(block->buffer_desc_->len, max_block_size_)
         << "Only io sized buffers should spill";
     free_io_buffers_.Enqueue(block->buffer_desc_);
-    // Finish the DeleteBlock() work.
-    if (block->is_deleted_) {
-      block->buffer_desc_->block = NULL;
-      block->buffer_desc_ = NULL;
-      ReturnUnusedBlock(block);
-      block = NULL;
-    }
-    // Multiple threads may be waiting for the same block in FindBuffer().  Wake them
-    // all up.  One thread will get this block, and the others will re-evaluate whether
-    // they should continue waiting and if another write needs to be initiated.
-    buffer_available_cv_.notify_all();
   }
-  DCHECK(Validate()) << endl << DebugInternal();
 
   if (!write_status.ok() || !status.ok() || is_cancelled_) {
     VLOG_FILE << "Query: " << query_id_ << ". Write did not complete successfully: "
@@ -908,11 +910,29 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) {
       VLOG_QUERY << "Query: " << query_id_ << " error while writing unpinned blocks.";
       if (state != NULL) state->LogError(status.msg());
     }
-    // Set cancelled and wake up waiting threads if an error occurred.  Note that in
-    // the case of client_local_, that thread was woken up above.
+    // Set cancelled. Threads waiting for a write will be woken up in the normal way when
+    // one of the writes they are waiting for completes.
     is_cancelled_ = true;
-    buffer_available_cv_.notify_all();
   }
+
+  // Notify any threads that may have been expecting to get block's buffer based on
+  // the value of 'non_local_outstanding_writes_'. Wake them all up. If we added
+  // a buffer to 'free_io_buffers_', one thread will get a buffer. All the others
+  // will re-evaluate whether they should continue waiting and if another write needs
+  // to be initiated.
+  if (!block->client_local_) buffer_available_cv_.notify_all();
+  if (block->is_deleted_) {
+    // Finish the DeleteBlock() work.
+    block->buffer_desc_->block = NULL;
+    block->buffer_desc_ = NULL;
+    ReturnUnusedBlock(block);
+    block = NULL;
+  } else {
+    // Wake up the thread waiting on this block (if any).
+    block->write_complete_cv_.notify_one();
+  }
+
+  DCHECK(Validate()) << endl << DebugInternal();
 }
 
 void BufferedBlockMgr::DeleteBlock(Block* block) {
@@ -956,6 +976,7 @@ void BufferedBlockMgr::DeleteBlockLocked(const unique_lock<mutex>& lock, Block*
     } else {
       if (!free_io_buffers_.Contains(block->buffer_desc_)) {
         free_io_buffers_.Enqueue(block->buffer_desc_);
+        // Wake up one of the waiting threads, which will grab the buffer.
         buffer_available_cv_.notify_one();
       }
       block->buffer_desc_->block = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f4da9251/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h
index 58744ff..ac707bc 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -288,8 +288,9 @@ class BufferedBlockMgr {
     /// True if the block is deleted by the client.
     bool is_deleted_;
 
-    /// Condition variable for when there is a specific client waiting for this block.
-    /// Only used if client_local_ is true.
+    /// Condition variable to wait for the write to this block to finish. If 'in_write_'
+    /// is true, notify_one() will eventually be called on this condition variable. Only
+    /// on thread should wait on this cv at a time.
     /// TODO: Currently we use block_mgr_->lock_ for this condvar. There is no reason to
     /// use that lock_ that is already overloaded, see IMPALA-1883.
     boost::condition_variable write_complete_cv_;
@@ -398,6 +399,8 @@ class BufferedBlockMgr {
   RuntimeProfile* profile() { return profile_.get(); }
   int writes_issued() const { return writes_issued_; }
 
+  void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
+
  private:
   friend struct Client;
 
@@ -443,11 +446,12 @@ class BufferedBlockMgr {
   /// cancellation. It should be called without the lock_ acquired.
   Status DeleteOrUnpinBlock(Block* block, bool unpin);
 
-  /// Transfers the buffer from 'src' to 'dst'. 'src' must be pinned.
+  /// Transfers the buffer from 'src' to 'dst'. 'src' must be pinned. If a write is
+  /// already in flight for 'src', this may block until that write completes.
   /// If unpin == false, 'src' is simply deleted.
   /// If unpin == true, 'src' is unpinned and it may block until the write of 'src' is
-  /// completed. In that case it will use the lock_ for the condvar. Thus, the lock_
-  /// needs to not have been taken when this function is called.
+  /// completed.
+  /// The caller should not hold 'lock_'.
   Status TransferBuffer(Block* dst, Block* src, bool unpin);
 
   /// Returns the total number of unreserved buffers. This is the sum of unpinned,
@@ -483,6 +487,10 @@ class BufferedBlockMgr {
   /// Issues the write for this block to the DiskIoMgr.
   Status WriteUnpinnedBlock(Block* block);
 
+  /// Wait until either the write for 'block' completes or the block mgr is cancelled.
+  /// 'lock_' must be held with 'lock'.
+  void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block);
+
   /// Allocate block_size bytes in a temporary file. Try multiple disks if error occurs.
   /// Returns an error only if no temporary files are usable.
   Status AllocateScratchSpace(int64_t block_size, TmpFileMgr::File** tmp_file,
@@ -549,7 +557,12 @@ class BufferedBlockMgr {
   /// This does not include client-local writes.
   int non_local_outstanding_writes_;
 
-  /// Signal availability of free buffers.
+  /// Signal availability of free buffers. Also signalled when a write completes for a
+  /// pinned block, in case another thread was expecting to obtain its buffer. If
+  /// 'non_local_outstanding_writes_' > 0, notify_all() will eventually be called on
+  /// this condition variable. To avoid free buffers accumulating while threads wait
+  /// on the cv, a woken thread must grab an available buffer (unless is_cancelled_ is
+  /// true at that time).
   boost::condition_variable buffer_available_cv_;
 
   /// All used or unused blocks allocated by the BufferedBlockMgr.
@@ -667,6 +680,10 @@ class BufferedBlockMgr {
   /// and hence no real reason to keep this separate from encryption.  When true, blocks
   /// will have an integrity check (SHA-256) performed after being read from disk.
   const bool check_integrity_;
+
+  /// Debug option to delay write completion.
+  int debug_write_delay_ms_;
+
 }; // class BufferedBlockMgr
 
 } // namespace impala.