You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/06/06 19:32:05 UTC

[1/2] hive git commit: HIVE-19418 : add background stats updater similar to compactor (Sergey Shelukhin, reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/branch-3 43c4bd3c6 -> bdac6edfd


http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 0461c4e..8c3ada3 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
 import org.apache.thrift.TException;
 
 /**
@@ -1185,4 +1186,23 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
   public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
     return objectStore.deleteRuntimeStats(maxRetainSecs);
   }
+
+  @Override
+  public List<FullTableName> getTableNamesWithStats() throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }
+
+  @Override
+  public List<FullTableName> getAllTableNamesForStats() throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }
+
+  @Override
+  public Map<String, List<String>> getPartitionColsWithStats(String catName,
+      String dbName, String tableName) throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index b71eda4..f98e8de 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 
@@ -1172,4 +1173,23 @@ public class DummyRawStoreForJdoConnection implements RawStore {
   public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
     return 0;
   }
+
+  @Override
+  public List<FullTableName> getTableNamesWithStats() throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }
+
+  @Override
+  public List<FullTableName> getAllTableNamesForStats() throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }
+
+  @Override
+  public Map<String, List<String>> getPartitionColsWithStats(String catName,
+      String dbName, String tableName) throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java
index fe2d758..f824dbd 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java
@@ -354,7 +354,7 @@ public class TestTablesCreateDropAlterTruncate extends MetaStoreClientTest {
         createdTable.getSd().getLocation());
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = InvalidObjectException.class)
   public void testCreateTableNullDatabase() throws Exception {
     Table table = testTables[0];
     table.setDbName(null);
@@ -889,7 +889,7 @@ public class TestTablesCreateDropAlterTruncate extends MetaStoreClientTest {
     }
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = InvalidOperationException.class)
   public void testAlterTableNullDatabaseInNew() throws Exception {
     Table originalTable = testTables[0];
     Table newTable = originalTable.deepCopy();


[2/2] hive git commit: HIVE-19418 : add background stats updater similar to compactor (Sergey Shelukhin, reviewed by Ashutosh Chauhan)

Posted by se...@apache.org.
HIVE-19418 : add background stats updater similar to compactor (Sergey Shelukhin, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bdac6edf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bdac6edf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bdac6edf

Branch: refs/heads/branch-3
Commit: bdac6edfd801804fba19dc1f2f59885c9dc743e7
Parents: 43c4bd3
Author: sergey <se...@apache.org>
Authored: Wed Jun 6 12:20:06 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Jun 6 12:23:50 2018 -0700

----------------------------------------------------------------------
 .../listener/DummyRawStoreFailEvent.java        |  23 +-
 .../org/apache/hadoop/hive/ql/DriverUtils.java  |  77 +++
 .../hadoop/hive/ql/session/SessionState.java    |   2 +-
 .../hive/ql/stats/StatsUpdaterThread.java       | 652 +++++++++++++++++++
 .../hive/ql/txn/compactor/CompactorMR.java      |  51 +-
 .../hive/ql/stats/TestStatsUpdaterThread.java   | 472 ++++++++++++++
 .../hadoop/hive/metastore/HiveAlterHandler.java |   8 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |  11 +
 .../hive/metastore/MetaStoreDirectSql.java      |  91 +++
 .../hadoop/hive/metastore/ObjectStore.java      |  99 ++-
 .../apache/hadoop/hive/metastore/RawStore.java  |   8 +
 .../hive/metastore/cache/CachedStore.java       |  17 +
 .../hive/metastore/conf/EnumValidator.java      |  26 +
 .../hive/metastore/conf/MetastoreConf.java      |  16 +
 .../hive/metastore/utils/MetaStoreUtils.java    |  38 +-
 .../DummyRawStoreControlledCommit.java          |  20 +
 .../DummyRawStoreForJdoConnection.java          |  20 +
 .../TestTablesCreateDropAlterTruncate.java      |   4 +-
 18 files changed, 1578 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 0cc0ae5..ff97522 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -22,11 +22,13 @@ import org.apache.hadoop.hive.metastore.api.ISchemaName;
 import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.FileMetadataHandler;
@@ -88,6 +90,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.api.WMNullablePool;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
 import org.apache.thrift.TException;
 
 /**
@@ -1221,4 +1224,22 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
     return objectStore.deleteRuntimeStats(maxRetainSecs);
   }
 
-}
+
+  @Override
+  public List<FullTableName> getTableNamesWithStats() throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }
+
+  @Override
+  public List<FullTableName> getAllTableNamesForStats() throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }
+
+  @Override
+  public Map<String, List<String>> getPartitionColsWithStats(String catName,
+      String dbName, String tableName) throws MetaException,
+      NoSuchObjectException {
+    return null;
+  }}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
new file mode 100644
index 0000000..8228109
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DriverUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(DriverUtils.class);
+
+  public static void runOnDriver(HiveConf conf, String user,
+      SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
+    SessionState.setCurrentSessionState(sessionState);
+    boolean isOk = false;
+    try {
+      QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
+      Driver driver = new Driver(qs, user, null, null);
+      driver.setCompactionWriteIds(writeIds);
+      try {
+        CommandProcessorResponse cpr = driver.run(query);
+        if (cpr.getResponseCode() != 0) {
+          LOG.error("Failed to run " + query, cpr.getException());
+          throw new HiveException("Failed to run " + query, cpr.getException());
+        }
+      } finally {
+        driver.close();
+        driver.destroy();
+      }
+      isOk = true;
+    } finally {
+      if (!isOk) {
+        try {
+          sessionState.close(); // This also resets SessionState.get.
+        } catch (Throwable th) {
+          LOG.warn("Failed to close a bad session", th);
+          SessionState.detachSession();
+        }
+      }
+    }
+  }
+
+  public static SessionState setUpSessionState(HiveConf conf, String user, boolean doStart) {
+    SessionState sessionState = SessionState.get();
+    if (sessionState == null) {
+      // Note: we assume that workers run on the same threads repeatedly, so we can set up
+      //       the session here and it will be reused without explicitly storing in the worker.
+      sessionState = new SessionState(conf, user);
+      if (doStart) {
+        // TODO: Required due to SessionState.getHDFSSessionPath. Why wasn't it required before?
+        sessionState.setIsHiveServerQuery(true);
+        SessionState.start(sessionState);
+      }
+      SessionState.setCurrentSessionState(sessionState);
+    }
+    return sessionState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 89129f9..9c52f2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -813,7 +813,7 @@ public class SessionState {
       return new Path(sessionPathString);
     }
     Preconditions.checkNotNull(ss.hdfsSessionPath,
-        "Non-local session path expected to be non-null");
+       "Non-local session path expected to be non-null");
     return ss.hdfsSessionPath;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
new file mode 100644
index 0000000..285db31
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.stats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.RawStoreProxy;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+public class StatsUpdaterThread extends Thread implements MetaStoreThread {
+  public static final String SKIP_STATS_AUTOUPDATE_PROPERTY = "skip.stats.autoupdate";
+  private static final Logger LOG = LoggerFactory.getLogger(StatsUpdaterThread.class);
+
+  protected Configuration conf;
+  protected int threadId;
+  protected AtomicBoolean stop;
+  protected AtomicBoolean looped;
+
+  private RawStore rs;
+  private TxnStore txnHandler;
+  /** Full tables, and partitions that currently have analyze commands queued or in progress. */
+  private ConcurrentHashMap<FullTableName, Boolean> tablesInProgress = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, Boolean> partsInProgress = new ConcurrentHashMap<>();
+  private AtomicInteger itemsInProgress = new AtomicInteger(0);
+
+  // Configuration
+  /** Whether to only update stats that already exist and are out of date. */
+  private boolean isExistingOnly;
+  private long noUpdatesWaitMs;
+  private int batchSize;
+
+  // Worker threads stuff
+  private BlockingQueue<AnalyzeWork> workQueue;
+  private Thread[] workers;
+
+  @Override
+  public void setConf(Configuration conf) {
+    StatsUpdateMode mode = StatsUpdateMode.valueOf(
+        MetastoreConf.getVar(conf, ConfVars.STATS_AUTO_UPDATE).toUpperCase());
+    switch (mode) {
+    case ALL: this.isExistingOnly = false; break;
+    case EXISTING: this.isExistingOnly = true; break;
+    default: throw new AssertionError("Unexpected mode " + mode);
+    }
+    noUpdatesWaitMs = MetastoreConf.getTimeVar(
+        conf, ConfVars.STATS_AUTO_UPDATE_NOOP_WAIT, TimeUnit.MILLISECONDS);
+    batchSize = MetastoreConf.getIntVar(conf, ConfVars.BATCH_RETRIEVE_MAX);
+    int workerCount = MetastoreConf.getIntVar(conf, ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT);
+    if (workerCount <= 0) {
+      workerCount = 1;
+    }
+    workers = new Thread[workerCount];
+    // Don't store too many items; if the queue is full we'll block the checker thread.
+    // Since the worker count determines how many queries can be running in parallel, it makes
+    // no sense to produce more work if the backlog is getting too long.
+    workQueue = new ArrayBlockingQueue<AnalyzeWork>(workerCount * 3);
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setThreadId(int threadId) {
+    this.threadId = threadId;
+  }
+
+  @Override
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException {
+    this.stop = stop;
+    this.looped = looped;
+    setPriority(MIN_PRIORITY);
+    setDaemon(true);
+    String user = "anonymous";
+    try {
+      user = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (IOException e) {
+      LOG.warn("Cannot determine the current user; executing as anonymous", e);
+    }
+    txnHandler = TxnUtils.getTxnStore(conf);
+    rs = RawStoreProxy.getProxy(conf, conf,
+        MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId);
+    for (int i = 0; i < workers.length; ++i) {
+      workers[i] = new Thread(new WorkerRunnable(conf, user));
+      workers[i].setDaemon(true);
+      workers[i].setName("Stats updater worker " + i);
+    }
+  }
+
+  @Override
+  public void run() {
+    startWorkers();
+    while (!stop.get()) {
+      boolean hadUpdates = runOneIteration();
+      try {
+        Thread.sleep(hadUpdates ? 0 : noUpdatesWaitMs);
+      } catch (InterruptedException e) {
+        LOG.info("Stats updater thread was interrupted and will now exit");
+        stopWorkers();
+        return;
+      }
+    }
+    LOG.info("Stats updater thread was stopped and will now exit");
+  }
+
+  @VisibleForTesting
+  void startWorkers() {
+    for (int i = 0; i < workers.length; ++i) {
+      workers[i].start();
+    }
+  }
+
+  @VisibleForTesting
+  boolean runOneIteration() {
+    List<FullTableName> fullTableNames;
+    try {
+      fullTableNames = getTablesToCheck();
+    } catch (Throwable t) {
+      LOG.error("Stats updater thread cannot retrieve tables and will now exit", t);
+      stopWorkers();
+      throw new RuntimeException(t);
+    }
+    LOG.debug("Processing {}", fullTableNames);
+    boolean hadUpdates = false;
+    for (FullTableName fullTableName : fullTableNames) {
+      try {
+        List<AnalyzeWork> commands = processOneTable(fullTableName);
+        hadUpdates = hadUpdates || commands != null;
+        if (commands != null) {
+          for (AnalyzeWork req : commands) {
+            markAnalyzeInProgress(req);
+            workQueue.put(req);
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to process " + fullTableName + "; skipping for now", e);
+      }
+    }
+    return hadUpdates;
+  }
+
+  private void stopWorkers() {
+    for (int i = 0; i < workers.length; ++i) {
+      workers[i].interrupt();
+    }
+  }
+
+  private List<AnalyzeWork> processOneTable(FullTableName fullTableName)
+      throws MetaException, NoSuchTxnException, NoSuchObjectException {
+    if (isAnalyzeTableInProgress(fullTableName)) return null;
+    String cat = fullTableName.catalog, db = fullTableName.db, tbl = fullTableName.table;
+    Table table = rs.getTable(cat, db, tbl);
+    LOG.debug("Processing table {}", table);
+
+    // Check if the table should be skipped.
+    String skipParam = table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY);
+    if ("true".equalsIgnoreCase(skipParam)) return null;
+
+    // TODO: when txn stats are implemented, use writeIds to determine stats accuracy
+    @SuppressWarnings("unused")
+    ValidReaderWriteIdList writeIds = null;
+    if (AcidUtils.isTransactionalTable(table)) {
+      writeIds = getWriteIds(fullTableName);
+    }
+    List<String> allCols = new ArrayList<>(table.getSd().getColsSize());
+    for (FieldSchema fs : table.getSd().getCols()) {
+      allCols.add(fs.getName());
+    }
+    Collections.sort(allCols);
+    if (table.getPartitionKeysSize() == 0) {
+      Map<String, String> params = table.getParameters();
+      List<String> colsToUpdate = isExistingOnly
+          ? getExistingNonPartTableStatsToUpdate(fullTableName, cat, db, tbl, params, allCols)
+          : getAnyStatsToUpdate(allCols, params);
+      LOG.debug("Columns to update are {}; existing only: {}, out of: {} based on {}",
+          colsToUpdate, isExistingOnly, allCols, params);
+
+      if (colsToUpdate == null || colsToUpdate.isEmpty()) {
+        return null; // No update necessary.
+      }
+      return Lists.newArrayList(new AnalyzeWork(fullTableName,
+          null, null, allCols.size() == colsToUpdate.size() ? null : colsToUpdate));
+    } else {
+      Map<String, List<String>> partsToAnalyze = new HashMap<>();
+      List<String> colsForAllParts = findPartitionsToAnalyze(
+          fullTableName, cat, db, tbl, allCols, partsToAnalyze);
+      LOG.debug("Columns to update are {} for all partitions; {} individual partitions."
+          + " Existing only: {}, out of: {}", colsForAllParts, partsToAnalyze.size(),
+          isExistingOnly, allCols);
+      if (colsForAllParts == null && partsToAnalyze.isEmpty()) {
+        return null; // No partitions need update.
+      }
+      if (colsForAllParts != null) {
+        // We can update all partitions with a single analyze query.
+        return Lists.newArrayList(new AnalyzeWork(
+            fullTableName, null, buildPartColStr(table), colsForAllParts));
+      }
+      List<AnalyzeWork> result = new ArrayList<>(partsToAnalyze.size());
+      for (Map.Entry<String, List<String>> e : partsToAnalyze.entrySet()) {
+        LOG.debug("Adding analyze work for {}", e.getKey());
+        result.add(new AnalyzeWork(fullTableName, e.getKey(), null, e.getValue()));
+      }
+      return result;
+    }
+  }
+
+  private List<String> findPartitionsToAnalyze(FullTableName fullTableName, String cat, String db,
+      String tbl, List<String> allCols, Map<String, List<String>> partsToAnalyze)
+          throws MetaException, NoSuchObjectException {
+    // TODO: ideally when col-stats-accurate stuff is stored in some sane structure, this should
+    //       to retrieve partsToUpdate in a single query; no checking partition params in java.
+    List<String> partNames = null;
+    Map<String, List<String>> colsPerPartition = null;
+    boolean isAllParts = true;
+    if (isExistingOnly) {
+      colsPerPartition = rs.getPartitionColsWithStats(cat, db, tbl);
+      partNames = Lists.newArrayList(colsPerPartition.keySet());
+      int partitionCount = rs.getNumPartitionsByFilter(cat, db, tbl, "");
+      isAllParts = partitionCount == partNames.size();
+    } else {
+      partNames = rs.listPartitionNames(cat, db, tbl, (short) -1);
+      isAllParts = true;
+    }
+    Table t = rs.getTable(cat, db, tbl);
+    List<Partition> currentBatch = null;
+    int nextBatchStart = 0, nextIxInBatch = -1, currentBatchStart = 0;
+    List<String> colsToUpdateForAll = null;
+    while (true) {
+      if (currentBatch == null || nextIxInBatch == currentBatch.size()) {
+        if (nextBatchStart >= partNames.size()) {
+          break;
+        }
+        int nextBatchEnd = Math.min(partNames.size(), nextBatchStart + this.batchSize);
+        List<String> currentNames = partNames.subList(nextBatchStart, nextBatchEnd);
+        currentBatchStart = nextBatchStart;
+        nextBatchStart = nextBatchEnd;
+        try {
+          currentBatch = rs.getPartitionsByNames(cat, db, tbl, currentNames);
+        } catch (NoSuchObjectException e) {
+          LOG.error("Failed to get partitions for " + fullTableName + ", skipping some partitions", e);
+          currentBatch = null;
+          continue;
+        }
+        nextIxInBatch = 0;
+      }
+      int currentIxInBatch = nextIxInBatch++;
+      Partition part = currentBatch.get(currentIxInBatch);
+      String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+      LOG.debug("Processing partition ({} in batch), {}", currentIxInBatch, partName);
+
+      // Skip the partitions in progress, and the ones for which stats update is disabled.
+      // We could filter the skipped partititons out as part of the initial names query,
+      // but we assume it's extremely rare for individual partitions.
+      Map<String, String> params = part.getParameters();
+      String skipParam = params.get(SKIP_STATS_AUTOUPDATE_PROPERTY);
+      if (isAnalyzePartInProgress(fullTableName, partName) || "true".equalsIgnoreCase(skipParam)) {
+        if (isAllParts) {
+          addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch,
+              colsToUpdateForAll, partsToAnalyze);
+        }
+        isAllParts = false;
+        continue;
+      }
+
+      // Find which columns we need to update for this partition, if any.
+      List<String> colsToMaybeUpdate = allCols;
+      if (isExistingOnly) {
+        colsToMaybeUpdate = colsPerPartition.get(partName);
+        Collections.sort(colsToMaybeUpdate);
+      }
+      List<String> colsToUpdate = getAnyStatsToUpdate(colsToMaybeUpdate, params);
+      LOG.debug("Updating {} based on {} and {}", colsToUpdate, colsToMaybeUpdate, params);
+
+
+      if (colsToUpdate == null || colsToUpdate.isEmpty()) {
+        if (isAllParts) {
+          addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch,
+              colsToUpdateForAll, partsToAnalyze);
+        }
+        isAllParts = false;
+        continue;
+      }
+
+      // If issuing a query for all partitions, verify that we need update the same columns.
+      // TODO: for non columnar we don't need to do this... might as well update all stats.
+      if (isAllParts) {
+        List<String> newCols = verifySameColumnsForAllParts(colsToUpdateForAll, colsToUpdate);
+        if (newCols == null) {
+          isAllParts = false;
+          addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch,
+              colsToUpdateForAll, partsToAnalyze);
+        } else if (colsToUpdateForAll == null) {
+          colsToUpdateForAll = newCols;
+        }
+      }
+
+      if (!isAllParts) {
+        LOG.trace("Adding {}, {}", partName, colsToUpdate);
+        partsToAnalyze.put(partName, colsToUpdate);
+      }
+    }
+    return isAllParts ? colsToUpdateForAll : null;
+  }
+
+  private List<String> verifySameColumnsForAllParts(
+      List<String> colsToUpdateForAll, List<String> colsToUpdate) {
+    if (colsToUpdateForAll == null) {
+      return colsToUpdate;
+    }
+    if (colsToUpdate.size() != colsToUpdateForAll.size()) {
+      return null;
+    }
+    // Assumes the lists are sorted.
+    for (int i = 0; i < colsToUpdateForAll.size(); ++i) {
+      if (!colsToUpdate.get(i).equals(colsToUpdateForAll.get(i))) {
+        return null;
+      }
+    }
+    return colsToUpdateForAll;
+  }
+
+  private void addPreviousPartitions(Table t, List<String> allPartNames,
+      int currentBatchStart, List<Partition> currentBatch, int currentIxInBatch,
+      List<String> cols, Map<String, List<String>> partsToAnalyze) throws MetaException {
+    // Add all the names for previous batches.
+    for (int i = 0; i < currentBatchStart; ++i) {
+      LOG.trace("Adding previous {}, {}", allPartNames.get(i), cols);
+      partsToAnalyze.put(allPartNames.get(i), cols);
+    }
+    // Current match may be out of order w.r.t. the global name list, so add specific parts.
+    for (int i = 0; i < currentIxInBatch; ++i) {
+      String name = Warehouse.makePartName(t.getPartitionKeys(), currentBatch.get(i).getValues());
+      LOG.trace("Adding previous {}, {}", name, cols);
+      partsToAnalyze.put(name, cols);
+    }
+  }
+
+  private String buildPartColStr(Table table) {
+    String partColStr = "";
+    for (int i = 0; i < table.getPartitionKeysSize(); ++i) {
+      if (i != 0) {
+        partColStr += ",";
+      }
+      partColStr += table.getPartitionKeys().get(i).getName();
+    }
+    return partColStr;
+  }
+
+  private List<String> getExistingNonPartTableStatsToUpdate(FullTableName fullTableName,
+      String cat, String db, String tbl, Map<String, String> params,
+      List<String> allCols) throws MetaException {
+    ColumnStatistics existingStats = null;
+    try {
+      existingStats = rs.getTableColumnStatistics(cat, db, tbl, allCols);
+    } catch (NoSuchObjectException e) {
+      LOG.error("Cannot retrieve existing stats, skipping " + fullTableName, e);
+      return null;
+    }
+    return getExistingStatsToUpdate(existingStats, params);
+  }
+
+  private List<String> getExistingStatsToUpdate(
+      ColumnStatistics existingStats, Map<String, String> params) {
+    boolean hasAnyAccurate = StatsSetupConst.areBasicStatsUptoDate(params);
+    List<String> colsToUpdate = new ArrayList<>();
+    for (ColumnStatisticsObj obj : existingStats.getStatsObj()) {
+      String col = obj.getColName();
+      if (!hasAnyAccurate || !StatsSetupConst.areColumnStatsUptoDate(params, col)) {
+        colsToUpdate.add(col);
+      }
+    }
+    return colsToUpdate;
+  }
+
+  private List<String> getAnyStatsToUpdate(
+      List<String> allCols, Map<String, String> params) {
+    // Note: we only run "for columns" command and assume no basic stats means no col stats.
+    if (!StatsSetupConst.areBasicStatsUptoDate(params)) {
+      return allCols;
+    }
+    List<String> colsToUpdate = new ArrayList<>();
+    for (String col : allCols) {
+      if (!StatsSetupConst.areColumnStatsUptoDate(params, col)) {
+        colsToUpdate.add(col);
+      }
+    }
+    return colsToUpdate;
+  }
+
+  private List<FullTableName> getTablesToCheck() throws MetaException, NoSuchObjectException {
+    if (isExistingOnly) {
+      try {
+        return rs.getTableNamesWithStats();
+      } catch (Exception ex) {
+        LOG.error("Error from getTablesWithStats, getting all the tables", ex);
+      }
+    }
+    return rs.getAllTableNamesForStats();
+  }
+
+  private ValidReaderWriteIdList getWriteIds(
+      FullTableName fullTableName) throws NoSuchTxnException, MetaException {
+    GetValidWriteIdsRequest req = new GetValidWriteIdsRequest();
+    req.setFullTableNames(Lists.newArrayList(fullTableName.toString()));
+    return TxnUtils.createValidReaderWriteIdList(
+        txnHandler.getValidWriteIds(req).getTblValidWriteIds().get(0));
+  }
+
+
+  private void markAnalyzeInProgress(AnalyzeWork req) {
+    if (req.partName == null) {
+      Boolean old = tablesInProgress.putIfAbsent(req.tableName, true);
+      if (old != null) {
+        throw new AssertionError("The table was added to progress twice: " + req.tableName);
+      }
+    } else {
+      String partName = req.makeFullPartName();
+      Boolean old = partsInProgress.putIfAbsent(partName, true);
+      if (old != null) {
+        throw new AssertionError("The partition was added to progress twice: " + partName);
+      }
+    }
+    itemsInProgress.incrementAndGet();
+  }
+
+  private void markAnalyzeDone(AnalyzeWork req) {
+    if (req.partName == null) {
+      Boolean old = tablesInProgress.remove(req.tableName);
+      if (old == null) {
+        throw new AssertionError("The table was not in progress: " + req.tableName);
+      }
+    } else {
+      String partName = req.makeFullPartName();
+      Boolean old = partsInProgress.remove(partName);
+      if (old == null) {
+        throw new AssertionError("Partition was not in progress: " + partName);
+      }
+    }
+    // This is used for tests where there's always just one batch of work and we do the
+    // checks after the batch, so the check will only come at the end of queueing.
+    int remaining = itemsInProgress.decrementAndGet();
+    if (remaining == 0) {
+      synchronized (itemsInProgress) {
+        itemsInProgress.notifyAll();
+      }
+    }
+  }
+
+  private boolean isAnalyzeTableInProgress(FullTableName fullTableName) {
+    return tablesInProgress.containsKey(fullTableName);
+  }
+
+  private boolean isAnalyzePartInProgress(FullTableName tableName, String partName) {
+    return partsInProgress.containsKey(makeFullPartName(tableName, partName));
+  }
+
+  private static String makeFullPartName(FullTableName tableName, String partName) {
+    return tableName + "/" + partName;
+  }
+
+  private final static class AnalyzeWork {
+    FullTableName tableName;
+    String partName, allParts;
+    List<String> cols;
+
+    public AnalyzeWork(FullTableName tableName, String partName, String allParts, List<String> cols) {
+      this.tableName = tableName;
+      this.partName = partName;
+      this.allParts = allParts;
+      this.cols = cols;
+    }
+
+    public String makeFullPartName() {
+      return StatsUpdaterThread.makeFullPartName(tableName, partName);
+    }
+
+    public String buildCommand() {
+      // Catalogs cannot be parsed as part of the query. Seems to be a bug.
+      String cmd = "analyze table " + tableName.db + "." + tableName.table;
+      assert partName == null || allParts == null;
+      if (partName != null) {
+        cmd += " partition(" + partName + ")";
+      }
+      if (allParts != null) {
+        cmd += " partition(" + allParts + ")";
+      }
+      cmd += " compute statistics for columns";
+      if (cols != null) {
+        cmd += " " + String.join(",", cols);
+      }
+      return cmd;
+    }
+
+    @Override
+    public String toString() {
+      return "AnalyzeWork [tableName=" + tableName + ", partName=" + partName
+          + ", allParts=" + allParts + ", cols=" + cols + "]";
+    }
+  }
+
+  @VisibleForTesting
+  public boolean runOneWorkerIteration(
+      SessionState ss, String user, HiveConf conf, boolean doWait) throws InterruptedException {
+    AnalyzeWork req;
+    if (doWait) {
+      req = workQueue.take();
+    } else {
+      req = workQueue.poll();
+      if (req == null) {
+        return false;
+      }
+    }
+    String cmd = null;
+    try {
+      cmd = req.buildCommand();
+      LOG.debug("Running {} based on {}", cmd, req);
+      if (doWait) {
+        SessionState.start(ss); // This is the first call, open the session
+      }
+      DriverUtils.runOnDriver(conf, user, ss, cmd, null);
+    } catch (Exception e) {
+      LOG.error("Analyze command failed: " + cmd, e);
+      try {
+        ss.close();
+      } catch (IOException e1) {
+        LOG.warn("Failed to close a bad session", e1);
+      } finally {
+        SessionState.detachSession();
+      }
+    } finally {
+      markAnalyzeDone(req);
+    }
+    return true;
+  }
+
+  public class WorkerRunnable implements Runnable {
+    private final HiveConf conf;
+    private final String user;
+
+    public WorkerRunnable(Configuration conf, String user) {
+      this.conf = new HiveConf(conf, HiveConf.class);
+      this.user = user;
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        // This should not start the actual Tez AM.
+        SessionState ss = DriverUtils.setUpSessionState(conf, user, false);
+        // Wait for the first item to arrive at the queue and process it.
+        try {
+          runOneWorkerIteration(ss, user, conf, true);
+        } catch (InterruptedException e) {
+          closeSession(ss);
+          LOG.info("Worker thread was interrupted and will now exit");
+          return;
+        }
+        // Keep draining the queue in the same session.
+        try {
+          while (runOneWorkerIteration(ss, user, conf, false)) {}
+        } catch (InterruptedException e) {
+          closeSession(ss);
+          LOG.info("Worker thread was interrupted unexpectedly and will now exit");
+          return;
+        };
+        // Close the session before we have to wait again.
+        closeSession(ss);
+        SessionState.detachSession();
+      }
+    }
+  }
+
+  private static void closeSession(SessionState ss) {
+    try {
+      ss.close();
+    } catch (IOException e1) {
+      LOG.error("Failed to close the session", e1);
+    }
+  }
+
+  @VisibleForTesting
+  public void waitForQueuedCommands() throws InterruptedException {
+    while (itemsInProgress.get() > 0) {
+      synchronized (itemsInProgress) {
+        itemsInProgress.wait(100L);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public int getQueueLength() {
+    return workQueue.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 982b180..6044719 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.DriverUtils;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -353,7 +354,7 @@ public class CompactorMR {
       conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
 
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      SessionState sessionState = setUpSessionState(conf, user);
+      SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false);
 
       // Note: we could skip creating the table and just add table type stuff directly to the
       //       "insert overwrite directory" command if there were no bucketing or list bucketing.
@@ -365,7 +366,7 @@ public class CompactorMR {
             p == null ? t.getSd() : p.getSd(), baseLocation.toString());
         LOG.info("Compacting a MM table into " + query);
         try {
-          runOnDriver(conf, user, sessionState, query, null);
+          DriverUtils.runOnDriver(conf, user, sessionState, query, null);
           break;
         } catch (Exception ex) {
           Throwable cause = ex;
@@ -380,26 +381,16 @@ public class CompactorMR {
 
       String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
       LOG.info("Compacting a MM table via " + query);
-      runOnDriver(conf, user, sessionState, query, writeIds);
+      DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds);
       commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds);
-      runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName, null);
+      DriverUtils.runOnDriver(conf, user, sessionState,
+          "drop table if exists " + tmpTableName, null);
     } catch (HiveException e) {
       LOG.error("Error compacting a MM table", e);
       throw new IOException(e);
     }
   }
 
-  public SessionState setUpSessionState(HiveConf conf, String user) {
-    SessionState sessionState = SessionState.get();
-    if (sessionState == null) {
-      // Note: we assume that workers run on the same threads repeatedly, so we can set up
-      //       the session here and it will be reused without explicitly storing in the worker.
-      sessionState = new SessionState(conf, user);
-      SessionState.setCurrentSessionState(sessionState);
-    }
-    return sessionState;
-  }
-
   private String generateTmpPath(StorageDescriptor sd) {
     return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
   }
@@ -514,36 +505,6 @@ public class CompactorMR {
     return result;
   }
 
-  private void runOnDriver(HiveConf conf, String user,
-      SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
-    boolean isOk = false;
-    try {
-      QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
-      Driver driver = new Driver(qs, user, null, null);
-      driver.setCompactionWriteIds(writeIds);
-      try {
-        CommandProcessorResponse cpr = driver.run(query);
-        if (cpr.getResponseCode() != 0) {
-          LOG.error("Failed to run " + query, cpr.getException());
-          throw new HiveException("Failed to run " + query, cpr.getException());
-        }
-      } finally {
-        driver.close();
-        driver.destroy();
-      }
-      isOk = true;
-    } finally {
-      if (!isOk) {
-        try {
-          sessionState.close(); // This also resets SessionState.get.
-        } catch (Throwable th) {
-          LOG.warn("Failed to close a bad session", th);
-          SessionState.detachSession();
-        }
-      }
-    }
-  }
-
   private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
     String fullName = t.getDbName() + "." + t.getTableName();
     // TODO: ideally we should make a special form of insert overwrite so that we:

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
new file mode 100644
index 0000000..d0b41f3
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestStatsUpdaterThread {
+  @SuppressWarnings("unused")
+  static final private Logger LOG = LoggerFactory.getLogger(TestStatsUpdaterThread.class);
+  private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+    File.separator + TestStatsUpdaterThread.class.getCanonicalName()
+    + "-" + System.currentTimeMillis()
+  ).getPath().replaceAll("\\\\", "/");
+  private HiveConf hiveConf;
+  private SessionState ss;
+
+  String getTestDataDir() {
+    return TEST_DATA_DIR;
+  }
+
+  @SuppressWarnings("deprecation")
+  @Before
+  public void setUp() throws Exception {
+    this.hiveConf = new HiveConf(TestStatsUpdaterThread.class);
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getTestDataDir());
+    hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+       "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+//    hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true);
+    hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "all");
+    TxnDbUtil.setConfValues(hiveConf);
+    TxnDbUtil.prepDb(hiveConf);
+    File f = new File(getTestDataDir());
+    if (f.exists()) {
+      FileUtil.fullyDelete(f);
+    }
+    if (!(new File(getTestDataDir()).mkdirs())) {
+      throw new RuntimeException("Could not create " + getTestDataDir());
+    }
+    this.ss = DriverUtils.setUpSessionState(hiveConf, "hive", true);
+    cleanUp();
+  }
+
+  @After
+  public void cleanUp() throws HiveException {
+    executeQuery("drop table simple_stats");
+    executeQuery("drop table simple_stats2");
+    executeQuery("drop table simple_stats3");
+  }
+
+  @Test(timeout=20000)
+  public void testSimpleUpdateWithThreads() throws Exception {
+    StatsUpdaterThread su = createUpdater();
+    su.startWorkers();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+
+    executeQuery("create table simple_stats (i int, s string)");
+    executeQuery("insert into simple_stats (i, s) values (1, 'test')");
+    verifyAndUnsetColStats("simple_stats", Lists.newArrayList("i"), msClient);
+
+    assertTrue(su.runOneIteration());
+    su.waitForQueuedCommands();
+    verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, true);
+
+    msClient.close();
+  }
+
+  @Test(timeout=20000)
+  public void testMultipleTables() throws Exception {
+    StatsUpdaterThread su = createUpdater();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+
+    executeQuery("create table simple_stats (s string)");
+    executeQuery("insert into simple_stats (s) values ('test')");
+    executeQuery("create table simple_stats2 (s string)");
+    executeQuery("insert into simple_stats2 (s) values ('test2')");
+    verifyAndUnsetColStats("simple_stats", Lists.newArrayList("s"), msClient);
+    verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient);
+
+    assertTrue(su.runOneIteration());
+    drainWorkQueue(su);
+    verifyAndUnsetColStats("simple_stats", Lists.newArrayList("s"), msClient);
+    verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient);
+
+    setTableSkipProperty(msClient, "simple_stats", "true");
+    assertTrue(su.runOneIteration());
+    drainWorkQueue(su);
+    verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, false);
+    verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient);
+
+    msClient.close();
+  }
+
+  @Test(timeout=20000)
+  public void testExistingOnly() throws Exception {
+    hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "existing");
+    StatsUpdaterThread su = createUpdater();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+
+    executeQuery("create table simple_stats (i int, s string)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    executeQuery("insert into simple_stats (i, s) values (1, 'test')");
+    executeQuery("analyze table simple_stats compute statistics for columns i");
+    verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false);
+    verifyAndUnsetColStats("simple_stats", Lists.newArrayList("i"), msClient);
+
+    assertTrue(su.runOneIteration());
+    drainWorkQueue(su);
+    verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, true);
+    verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false);
+
+    msClient.close();
+  }
+
+  @Test(timeout=60000)
+  public void testQueueingWithThreads() throws Exception {
+    final int PART_COUNT = 12;
+    hiveConf.setInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 5);
+    hiveConf.setInt(MetastoreConf.ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT.getVarname(), 2);
+    StatsUpdaterThread su = createUpdater();
+    su.startWorkers();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    executeQuery("create table simple_stats (s string) partitioned by (i int)");
+    for (int i = 0; i < PART_COUNT; ++i) {
+      executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')");
+    }
+    verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", false);
+
+    // Set one of the partitions to be skipped, so that a command is created for every other one.
+    setPartitionSkipProperty(msClient, "simple_stats", "i=0", "true");
+
+
+    assertTrue(su.runOneIteration());
+    su.waitForQueuedCommands();
+    verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s"), msClient, false);
+    verifyPartStatsUpToDate(PART_COUNT, 1, msClient, "simple_stats", true);
+
+    assertFalse(su.runOneIteration());
+    drainWorkQueue(su, 0); // Nothing else is updated after the first update.
+
+    msClient.close();
+  }
+
+  @Test(timeout=20000)
+  public void testAllPartitions() throws Exception {
+    final int PART_COUNT = 3;
+    StatsUpdaterThread su = createUpdater();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    executeQuery("create table simple_stats (s string) partitioned by (i int)");
+    for (int i = 0; i < PART_COUNT; ++i) {
+      executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')");
+    }
+    verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", false);
+
+    assertTrue(su.runOneIteration());
+    drainWorkQueue(su, 1); // All the partitions need to be updated; a single command can be used.
+    verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", true);
+
+    assertFalse(su.runOneIteration());
+    drainWorkQueue(su, 0); // Nothing else is updated after the first update.
+
+    msClient.close();
+  }
+
+  @Test(timeout=20000)
+  public void testPartitionSubset() throws Exception {
+    final int NONSTAT_PART_COUNT = 3;
+    StatsUpdaterThread su = createUpdater();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    executeQuery("create table simple_stats (s string) partitioned by (i int)");
+    for (int i = 0; i < NONSTAT_PART_COUNT; ++i) {
+      executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')");
+    }
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true);
+    executeQuery("insert into simple_stats partition(i='"
+        + NONSTAT_PART_COUNT + "') values ('test')");
+    verifyPartStatsUpToDate(NONSTAT_PART_COUNT, 0, msClient, "simple_stats", false);
+    verifyStatsUpToDate("simple_stats",
+        "i=" + NONSTAT_PART_COUNT, Lists.newArrayList("s"), msClient, true);
+
+    final int EXCLUDED_PART = 1;
+    setPartitionSkipProperty(msClient, "simple_stats", "i=" + EXCLUDED_PART, "true");
+
+    assertTrue(su.runOneIteration());
+    // 1 is excluded via property, 3 already has stats, so we only expect two updates.
+    drainWorkQueue(su, NONSTAT_PART_COUNT - 1);
+    for (int i = 0; i < NONSTAT_PART_COUNT; ++i) {
+      verifyStatsUpToDate("simple_stats",
+          "i=" + i, Lists.newArrayList("s"), msClient, i != EXCLUDED_PART);
+    }
+    verifyStatsUpToDate("simple_stats", "i=" + EXCLUDED_PART,
+        Lists.newArrayList("s"), msClient, false);
+
+    msClient.close();
+  }
+
+  @Test(timeout=20000)
+  public void testPartitionsWithDifferentColsAll() throws Exception {
+    StatsUpdaterThread su = createUpdater();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    executeQuery("create table simple_stats (s string, t string, u string) partitioned by (i int)");
+    executeQuery("insert into simple_stats partition(i=0) values ('test', '0', 'foo')");
+    executeQuery("insert into simple_stats partition(i=1) values ('test', '1', 'bar')");
+    executeQuery("analyze table simple_stats partition(i=0) compute statistics for columns s");
+    executeQuery("analyze table simple_stats partition(i=1) compute statistics for columns s, u");
+    verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("t", "u"), msClient, false);
+    verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "u"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("t"), msClient, false);
+
+    assertTrue(su.runOneIteration());
+    // Different columns means different commands have to be run.
+    drainWorkQueue(su, 2);
+    verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t", "u"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true);
+
+    assertFalse(su.runOneIteration());
+    drainWorkQueue(su, 0); // Nothing else is updated after the first update.
+
+    msClient.close();
+  }
+
+
+  @Test(timeout=20000)
+  public void testPartitionsWithDifferentColsExistingOnly() throws Exception {
+    hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "existing");
+    StatsUpdaterThread su = createUpdater();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    executeQuery("create table simple_stats (s string, t string, u string) partitioned by (i int)");
+    executeQuery("insert into simple_stats partition(i=0) values ('test', '0', 'foo')");
+    executeQuery("insert into simple_stats partition(i=1) values ('test', '1', 'bar')");
+    executeQuery("insert into simple_stats partition(i=2) values ('test', '2', 'baz')");
+    executeQuery("analyze table simple_stats partition(i=0) compute statistics for columns s, t");
+    executeQuery("analyze table simple_stats partition(i=1) compute statistics for columns");
+    executeQuery("analyze table simple_stats partition(i=2) compute statistics for columns s");
+    verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("u"), msClient, false);
+    verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("s"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("u", "t"), msClient, false);
+
+    // We will unset s on i=0, and t on i=1. Only these should be updated; and nothing for 2.
+    verifyAndUnsetColStats("simple_stats", "i=0", Lists.newArrayList("s"), msClient);
+    verifyAndUnsetColStats("simple_stats", "i=1", Lists.newArrayList("t"), msClient);
+
+    assertTrue(su.runOneIteration());
+    drainWorkQueue(su, 2);
+    // Exact same state as above.
+    verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("u"), msClient, false);
+    verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("s"), msClient, true);
+    verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("u", "t"), msClient, false);
+
+    msClient.close();
+  }
+
+  @Test(timeout=20000)
+  public void testParallelOps() throws Exception {
+    // Set high worker count so we get a longer queue.
+    hiveConf.setInt(MetastoreConf.ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT.getVarname(), 4);
+    StatsUpdaterThread su = createUpdater();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    executeQuery("create table simple_stats (s string)");
+    executeQuery("create table simple_stats2 (s string) partitioned by (i int)");
+    executeQuery("create table simple_stats3 (s string) partitioned by (i int)");
+    executeQuery("insert into simple_stats values ('test')");
+    executeQuery("insert into simple_stats2 partition(i=0) values ('test')");
+    executeQuery("insert into simple_stats3 partition(i=0) values ('test')");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true);
+    executeQuery("insert into simple_stats3 partition(i=1) values ('test')");
+
+    assertTrue(su.runOneIteration());
+    assertEquals(3, su.getQueueLength());
+    // Nothing updated yet.
+    verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false);
+    verifyPartStatsUpToDate(1, 0, msClient, "simple_stats2", false);
+    verifyStatsUpToDate("simple_stats3", "i=0", Lists.newArrayList("s"), msClient, false);
+    verifyStatsUpToDate("simple_stats3", "i=1", Lists.newArrayList("s"), msClient, true);
+
+    assertFalse(su.runOneIteration());
+    assertEquals(3, su.getQueueLength()); // Nothing new added to the queue while analyze runs.
+
+    // Add another partition without stats.
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    executeQuery("insert into simple_stats3 partition(i=2) values ('test')");
+
+    assertTrue(su.runOneIteration());
+    assertEquals(4, su.getQueueLength()); // An item for new partition is queued now.
+
+    drainWorkQueue(su, 4);
+
+    verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, true);
+    verifyPartStatsUpToDate(1, 0, msClient, "simple_stats2", true);
+    verifyPartStatsUpToDate(3, 0, msClient, "simple_stats3", true);
+
+    assertFalse(su.runOneIteration());
+    drainWorkQueue(su, 0); // Nothing else is updated after the first update.
+
+    msClient.close();
+  }
+
+  private void verifyPartStatsUpToDate(int partCount, int skip,
+      IMetaStoreClient msClient, String tbl, boolean isUpToDate) throws Exception {
+    for (int i = skip; i < partCount; ++i) {
+      verifyStatsUpToDate(tbl, "i=" + i, Lists.newArrayList("s"), msClient, isUpToDate);
+    }
+  }
+
+  private void drainWorkQueue(StatsUpdaterThread su) throws InterruptedException {
+    while (su.runOneWorkerIteration(ss, ss.getUserName(), ss.getConf(), false)) {}
+  }
+
+  private void drainWorkQueue(StatsUpdaterThread su, int expectedReqs) throws InterruptedException {
+    int actualReqs = 0;
+    while (su.runOneWorkerIteration(ss, ss.getUserName(), ss.getConf(), false)) {
+      ++actualReqs;
+    }
+    assertEquals(expectedReqs, actualReqs);
+  }
+
+  private void setTableSkipProperty(
+      IMetaStoreClient msClient, String tbl, String val) throws Exception {
+    Table table = msClient.getTable(ss.getCurrentDatabase(), tbl);
+    table.getParameters().put(StatsUpdaterThread.SKIP_STATS_AUTOUPDATE_PROPERTY, val);
+    msClient.alter_table(table.getDbName(), table.getTableName(), table);
+  }
+
+  private void setPartitionSkipProperty(
+      IMetaStoreClient msClient, String tblName, String partName, String val) throws Exception {
+    Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName);
+    part.getParameters().put(StatsUpdaterThread.SKIP_STATS_AUTOUPDATE_PROPERTY, val);
+    msClient.alter_partition(part.getCatName(), part.getDbName(), tblName, part);
+  }
+
+  private void verifyAndUnsetColStats(
+      String tblName, List<String> cols, IMetaStoreClient msClient) throws Exception {
+    Table tbl = msClient.getTable(ss.getCurrentDatabase(), tblName);
+    verifyAndUnsetColStatsVal(tbl.getParameters(), cols);
+    EnvironmentContext ec = new EnvironmentContext();
+    // Make sure metastore doesn't mess with our bogus stats updates.
+    ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+    msClient.alter_table_with_environmentContext(tbl.getDbName(), tbl.getTableName(), tbl, ec);
+    // Double-check.
+    tbl = msClient.getTable(ss.getCurrentDatabase(), tblName);
+    for (String col : cols) {
+      assertFalse(StatsSetupConst.areColumnStatsUptoDate(tbl.getParameters(), col));
+    }
+  }
+
+  private void verifyAndUnsetColStatsVal(Map<String, String> params, List<String> cols) {
+    assertTrue(StatsSetupConst.areBasicStatsUptoDate(params));
+    for (String col : cols) {
+      assertTrue(StatsSetupConst.areColumnStatsUptoDate(params, col));
+    }
+    StatsSetupConst.removeColumnStatsState(params, cols);
+    StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE);
+  }
+
+  private void verifyAndUnsetColStats(String tblName, String partName, List<String> cols,
+      IMetaStoreClient msClient) throws Exception {
+    Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName);
+    verifyAndUnsetColStatsVal(part.getParameters(), cols);
+    EnvironmentContext ec = new EnvironmentContext();
+    // Make sure metastore doesn't mess with our bogus stats updates.
+    ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+    msClient.alter_partition(part.getCatName(), part.getDbName(), tblName, part, ec);
+    // Double-check.
+    part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName);
+    for (String col : cols) {
+      assertFalse(StatsSetupConst.areColumnStatsUptoDate(part.getParameters(), col));
+    }
+  }
+
+  private void verifyStatsUpToDate(String tbl, ArrayList<String> cols, IMetaStoreClient msClient,
+      boolean isUpToDate) throws Exception {
+    Table table = msClient.getTable(ss.getCurrentDatabase(), tbl);
+    verifyStatsUpToDate(table.getParameters(), cols, isUpToDate);
+  }
+
+  private void verifyStatsUpToDate(Map<String, String> params, ArrayList<String> cols,
+      boolean isUpToDate) {
+    if (isUpToDate) {
+      assertTrue(StatsSetupConst.areBasicStatsUptoDate(params));
+    }
+    for (String col : cols) {
+      assertEquals(isUpToDate, StatsSetupConst.areColumnStatsUptoDate(params, col));
+    }
+  }
+
+  private void verifyStatsUpToDate(String tbl, String part, ArrayList<String> cols,
+      IMetaStoreClient msClient, boolean isUpToDate) throws Exception {
+    Partition partition = msClient.getPartition(ss.getCurrentDatabase(), tbl, part);
+    verifyStatsUpToDate(partition.getParameters(), cols, isUpToDate);
+  }
+
+  private void executeQuery(String query) throws HiveException {
+    DriverUtils.runOnDriver(hiveConf, ss.getUserName(), ss, query, null);
+  }
+
+  private StatsUpdaterThread createUpdater() throws MetaException {
+    StatsUpdaterThread su = new StatsUpdaterThread();
+    su.setConf(hiveConf);
+    su.init(new AtomicBoolean(false), null);
+    return su;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index be05838..ed53c90 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -310,6 +310,7 @@ public class HiveAlterHandler implements AlterHandler {
         }
       } else {
         // operations other than table rename
+
         if (MetaStoreUtils.requireCalStats(null, null, newt, environmentContext) &&
             !isPartitionedTable) {
           Database db = msdb.getDatabase(catName, newDbName);
@@ -540,10 +541,11 @@ public class HiveAlterHandler implements AlterHandler {
         }
         success = msdb.commitTransaction();
       } catch (InvalidObjectException e) {
-        throw new InvalidOperationException("alter is not possible");
-      } catch (NoSuchObjectException e){
+        LOG.warn("Alter failed", e);
+        throw new InvalidOperationException("alter is not possible: " + e.getMessage());
+      } catch (NoSuchObjectException e) {
         //old partition does not exist
-        throw new InvalidOperationException("alter is not possible");
+        throw new InvalidOperationException("alter is not possible: " + e.getMessage());
       } finally {
         if(!success) {
           msdb.rollbackTransaction();

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index d37e705..dd45cdb 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode;
 import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
@@ -9053,6 +9054,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     t.start();
   }
 
+
   /**
    * Start threads outside of the thrift service, such as the compactor threads.
    * @param conf Hive configuration object
@@ -9092,6 +9094,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           startCompactorWorkers(conf);
           startCompactorCleaner(conf);
           startRemoteOnlyTasks(conf);
+          startStatsUpdater(conf);
         } catch (Throwable e) {
           LOG.error("Failure when starting the compactor, compactions may not happen, " +
               StringUtils.stringifyException(e));
@@ -9107,6 +9110,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     t.start();
   }
 
+  protected static void startStatsUpdater(Configuration conf) throws Exception {
+    StatsUpdateMode mode = StatsUpdateMode.valueOf(
+        MetastoreConf.getVar(conf, ConfVars.STATS_AUTO_UPDATE).toUpperCase());
+    if (mode == StatsUpdateMode.NONE) return;
+    MetaStoreThread t = instantiateThread("org.apache.hadoop.hive.ql.stats.StatsUpdaterThread");
+    initializeAndStartThread(t, conf);
+  }
+
   private static void startCompactorInitiator(Configuration conf) throws Exception {
     if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
       MetaStoreThread initiator =

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index e2ca6d2..346cd92 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
 import org.apache.hive.common.util.BloomFilter;
 import org.datanucleus.store.rdbms.query.ForwardQueryResult;
 import org.slf4j.Logger;
@@ -2459,4 +2460,94 @@ class MetaStoreDirectSql {
     return ret;
   }
 
+  public final static Object[] STATS_TABLE_TYPES = new Object[] {
+    TableType.MANAGED_TABLE.toString(), TableType.MATERIALIZED_VIEW.toString()
+  };
+
+  public List<FullTableName> getTableNamesWithStats() throws MetaException {
+    // Could we also join with ACID tables to only get tables with outdated stats?
+    String queryText0 = "SELECT DISTINCT " + TBLS + ".\"TBL_NAME\", " + DBS + ".\"NAME\", "
+        + DBS + ".\"CTLG_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON "
+        + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"";
+    String queryText1 = " WHERE " + TBLS + ".\"TBL_TYPE\" IN ("
+        + makeParams(STATS_TABLE_TYPES.length) + ")";
+
+    List<FullTableName> result = new ArrayList<>();
+
+    String queryText = queryText0 + " INNER JOIN " + TAB_COL_STATS
+        + " ON " + TBLS + ".\"TBL_ID\" = " + TAB_COL_STATS + ".\"TBL_ID\"" + queryText1;
+    getStatsTableListResult(queryText, result);
+
+    queryText = queryText0 + " INNER JOIN " + PARTITIONS + " ON " + TBLS + ".\"TBL_ID\" = "
+        + PARTITIONS + ".\"TBL_ID\"" + " INNER JOIN " + PART_COL_STATS + " ON " + PARTITIONS
+        + ".\"PART_ID\" = " + PART_COL_STATS + ".\"PART_ID\"" + queryText1;
+    getStatsTableListResult(queryText, result);
+
+    return result;
+  }
+
+  public Map<String, List<String>> getColAndPartNamesWithStats(
+      String catName, String dbName, String tableName) throws MetaException {
+    // Could we also join with ACID tables to only get tables with outdated stats?
+    String queryText = "SELECT DISTINCT " + PARTITIONS + ".\"PART_NAME\", " + PART_COL_STATS
+        + ".\"COLUMN_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = "
+        + DBS + ".\"DB_ID\" INNER JOIN " + PARTITIONS + " ON " + TBLS + ".\"TBL_ID\" = "
+        + PARTITIONS + ".\"TBL_ID\"  INNER JOIN " + PART_COL_STATS + " ON " + PARTITIONS
+        + ".\"PART_ID\" = " + PART_COL_STATS + ".\"PART_ID\" WHERE " + DBS + ".\"NAME\" = ? AND "
+        + DBS + ".\"CTLG_NAME\" = ? AND " + TBLS + ".\"TBL_NAME\" = ? ORDER BY "
+        + PARTITIONS + ".\"PART_NAME\"";
+
+    LOG.debug("Running {}", queryText);
+    Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
+    try {
+      List<Object[]> sqlResult = ensureList(executeWithArray(
+          query, new Object[] { dbName, catName, tableName }, queryText));
+      Map<String, List<String>> result = new HashMap<>();
+      String lastPartName = null;
+      List<String> cols = null;
+      for (Object[] line : sqlResult) {
+        String col = extractSqlString(line[1]);
+        String part = extractSqlString(line[0]);
+        if (!part.equals(lastPartName)) {
+          if (lastPartName != null) {
+            result.put(lastPartName, cols);
+          }
+          cols = cols == null ? new ArrayList<>() : new ArrayList<>(cols.size());
+          lastPartName = part;
+        }
+        cols.add(col);
+      }
+      if (lastPartName != null) {
+        result.put(lastPartName, cols);
+      }
+      return result;
+    } finally {
+      query.closeAll();
+    }
+  }
+
+  public List<FullTableName> getAllTableNamesForStats() throws MetaException {
+    String queryText = "SELECT " + TBLS + ".\"TBL_NAME\", " + DBS + ".\"NAME\", "
+        + DBS + ".\"CTLG_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + TBLS
+        + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+        + " WHERE " + TBLS + ".\"TBL_TYPE\" IN (" + makeParams(STATS_TABLE_TYPES.length) + ")";
+    List<FullTableName> result = new ArrayList<>();
+    getStatsTableListResult(queryText, result);
+    return result;
+  }
+
+  private void getStatsTableListResult(
+      String queryText, List<FullTableName> result) throws MetaException {
+    LOG.debug("Running {}", queryText);
+    Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
+    try {
+      List<Object[]> sqlResult = ensureList(executeWithArray(query, STATS_TABLE_TYPES, queryText));
+      for (Object[] line : sqlResult) {
+        result.add(new FullTableName(
+            extractSqlString(line[2]), extractSqlString(line[1]), extractSqlString(line[0])));
+      }
+    } finally {
+      query.closeAll();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index e475be8..78b82b4 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
+
 import javax.jdo.JDOCanRetryException;
 import javax.jdo.JDODataStoreException;
 import javax.jdo.JDOException;
@@ -71,6 +72,7 @@ import javax.jdo.identity.IntIdentity;
 import javax.sql.DataSource;
 
 import com.google.common.base.Strings;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
@@ -211,6 +213,7 @@ import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
 import org.apache.hadoop.hive.metastore.utils.ObjectPair;
 import org.apache.thrift.TException;
 import org.datanucleus.AbstractNucleusContext;
@@ -1546,6 +1549,96 @@ public class ObjectStore implements RawStore, Configurable {
     }
   }
 
+  @Override
+  public List<FullTableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
+    return new GetListHelper<FullTableName>(null, null, null, true, false) {
+      @Override
+      protected List<FullTableName> getSqlResult(
+          GetHelper<List<FullTableName>> ctx) throws MetaException {
+        return directSql.getTableNamesWithStats();
+      }
+
+      @Override
+      protected List<FullTableName> getJdoResult(
+          GetHelper<List<FullTableName>> ctx) throws MetaException {
+        throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement?
+      }
+    }.run(false);
+  }
+
+  @Override
+  public Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName, String tableName)
+      throws MetaException, NoSuchObjectException {
+    return new GetHelper<Map<String, List<String>>>(catName, dbName, null, true, false) {
+      @Override
+      protected Map<String, List<String>> getSqlResult(
+          GetHelper<Map<String, List<String>>> ctx) throws MetaException {
+        try {
+          return directSql.getColAndPartNamesWithStats(catName, dbName, tableName);
+        } catch (Throwable ex) {
+          LOG.error("DirectSQL failed", ex);
+          throw new MetaException(ex.getMessage());
+        }
+      }
+
+      @Override
+      protected Map<String, List<String>> getJdoResult(
+          GetHelper<Map<String, List<String>>> ctx) throws MetaException {
+        throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement?
+      }
+
+      @Override
+      protected String describeResult() {
+        return results.size() + " partitions";
+      }
+    }.run(false);
+  }
+
+  @Override
+  public List<FullTableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
+    return new GetListHelper<FullTableName>(null, null, null, true, false) {
+      @Override
+      protected List<FullTableName> getSqlResult(
+          GetHelper<List<FullTableName>> ctx) throws MetaException {
+        return directSql.getAllTableNamesForStats();
+      }
+
+      @Override
+      protected List<FullTableName> getJdoResult(
+          GetHelper<List<FullTableName>> ctx) throws MetaException {
+        boolean commited = false;
+        Query query = null;
+        List<FullTableName> result = new ArrayList<>();
+        openTransaction();
+        try {
+          String paramStr = "", whereStr = "";
+          for (int i = 0; i < MetaStoreDirectSql.STATS_TABLE_TYPES.length; ++i) {
+            if (i != 0) {
+              paramStr += ", ";
+              whereStr += "||";
+            }
+            paramStr += "java.lang.String tt" + i;
+            whereStr += " tableType == tt" + i;
+          }
+          query = pm.newQuery(MTable.class, whereStr);
+          query.declareParameters(paramStr);
+          @SuppressWarnings("unchecked")
+          Collection<MTable> tbls = (Collection<MTable>) query.executeWithArray(
+              query, MetaStoreDirectSql.STATS_TABLE_TYPES);
+          pm.retrieveAll(tbls);
+          for (MTable tbl : tbls) {
+            result.add(new FullTableName(
+                tbl.getDatabase().getCatalogName(), tbl.getDatabase().getName(), tbl.getTableName()));
+          }
+          commited = commitTransaction();
+        } finally {
+          rollbackAndCleanup(commited, query);
+        }
+        return result;
+      }
+    }.run(false);
+  }
+
   protected List<String> getTablesInternal(String catName, String dbName, String pattern,
                                            TableType tableType, boolean allowSql, boolean allowJdo)
       throws MetaException, NoSuchObjectException {
@@ -3447,9 +3540,9 @@ public class ObjectStore implements RawStore, Configurable {
                      boolean allowSql, boolean allowJdo) throws MetaException {
       assert allowSql || allowJdo;
       this.allowJdo = allowJdo;
-      this.catName = normalizeIdentifier(catalogName);
-      this.dbName = normalizeIdentifier(dbName);
-      if (tblName != null){
+      this.catName = (catalogName != null) ? normalizeIdentifier(catalogName) : null;
+      this.dbName = (dbName != null) ? normalizeIdentifier(dbName) : null;
+      if (tblName != null) {
         this.tblName = normalizeIdentifier(tblName);
       } else {
         // tblName can be null in cases of Helper being used at a higher

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index 283798c..f350aa9 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
 import org.apache.thrift.TException;
 
 public interface RawStore extends Configurable {
@@ -1639,4 +1640,11 @@ public interface RawStore extends Configurable {
   /** Removes outdated statistics. */
   int deleteRuntimeStats(int maxRetainSecs) throws MetaException;
 
+  List<FullTableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException;
+
+  List<FullTableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException;
+
+  Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName,
+      String tableName) throws MetaException, NoSuchObjectException;
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 9da8d72..d9356b8 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -2492,4 +2493,20 @@ public class CachedStore implements RawStore, Configurable {
   public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
     return rawStore.deleteRuntimeStats(maxRetainSecs);
   }
+
+  @Override
+  public List<FullTableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
+    return rawStore.getTableNamesWithStats();
+  }
+
+  @Override
+  public List<FullTableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
+    return rawStore.getAllTableNamesForStats();
+  }
+
+  @Override
+  public Map<String, List<String>> getPartitionColsWithStats(String catName,
+      String dbName, String tableName) throws MetaException, NoSuchObjectException {
+    return rawStore.getPartitionColsWithStats(catName, dbName, tableName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java
new file mode 100644
index 0000000..8695471
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.conf;
+
+import java.util.Arrays;
+
+public class EnumValidator extends StringSetValidator {
+  public EnumValidator(Object[] statsUpdateModes) {
+    super(false, Arrays.stream(statsUpdateModes).map(eo -> eo.toString()).toArray(String[]::new));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index f3ed8cd..ed0edec 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -83,6 +83,10 @@ public class MetastoreConf {
   @VisibleForTesting
   static final String TEST_ENV_WORKAROUND = "metastore.testing.env.workaround.dont.ever.set.this.";
 
+  public static enum StatsUpdateMode {
+    NONE, EXISTING, ALL
+  }
+
   private static class TimeValue {
     final long val;
     final TimeUnit unit;
@@ -727,6 +731,18 @@ public class MetastoreConf {
         "The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is custom type."),
     STATS_DEFAULT_PUBLISHER("metastore.stats.default.publisher", "hive.stats.default.publisher", "",
         "The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is custom type."),
+    STATS_AUTO_UPDATE("metastore.stats.auto.analyze", "hive.metastore.stats.auto.analyze", "none",
+        new EnumValidator(StatsUpdateMode.values()),
+        "Whether to update stats in the background; none - no, all - for all tables, existing - only existing, out of date, stats."),
+    STATS_AUTO_UPDATE_NOOP_WAIT("metastore.stats.auto.analyze.noop.wait",
+        "hive.metastore.stats.auto.analyze.noop.wait", 5L, TimeUnit.MINUTES,
+        new TimeValidator(TimeUnit.MINUTES),
+        "How long to sleep if there were no stats needing update during an update iteration.\n" +
+        "This is a setting to throttle table/partition checks when nothing is being changed; not\n" +
+        "the analyze queries themselves."),
+    STATS_AUTO_UPDATE_WORKER_COUNT("metastore.stats.auto.analyze.worker.count",
+        "hive.metastore.stats.auto.analyze.worker.count", 1,
+        "Number of parallel analyze commands to run for background stats update."),
     STORAGE_SCHEMA_READER_IMPL("metastore.storage.schema.reader.impl", "metastore.storage.schema.reader.impl",
         DefaultStorageSchemaReader.class.getName(),
         "The class to use to read schemas from storage.  It must implement " +

http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 9b36d09..6ade490 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -24,6 +24,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.collections.ListUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -64,7 +65,9 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.util.MachineList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -718,12 +721,12 @@ public class MetaStoreUtils {
     params.put(StatsSetupConst.NUM_FILES, Integer.toString(numFiles));
     params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(tableSize));
   }
- 
+
   public static void clearQuickStats(Map<String, String> params) {
     params.remove(StatsSetupConst.NUM_FILES);
     params.remove(StatsSetupConst.TOTAL_SIZE);
   }
- 
+
 
   public static boolean areSameColumns(List<FieldSchema> oldCols, List<FieldSchema> newCols) {
     return ListUtils.isEqualList(oldCols, newCols);
@@ -1800,4 +1803,35 @@ public class MetaStoreUtils {
     if (catName == null || "".equals(catName)) catName = Warehouse.DEFAULT_CATALOG_NAME;
     return catName;
   }
+
+
+  public static class FullTableName {
+    public final String catalog, db, table;
+
+    public FullTableName(String catalog, String db, String table) {
+      assert catalog != null && db != null && table != null : catalog + ", " + db + ", " + table;
+      this.catalog = catalog;
+      this.db = db;
+      this.table = table;
+    }
+
+    @Override
+    public String toString() {
+      return catalog + MetaStoreUtils.CATALOG_DB_SEPARATOR + db + "." + table;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      return prime * (prime * (prime + catalog.hashCode()) + db.hashCode()) + table.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null || getClass() != obj.getClass()) return false;
+      FullTableName other = (FullTableName) obj;
+      return catalog.equals(other.catalog) && db.equals(other.db) && table.equals(other.table);
+    }
+  }
 }