You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/06/06 19:32:05 UTC
[1/2] hive git commit: HIVE-19418 : add background stats updater
similar to compactor (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/branch-3 43c4bd3c6 -> bdac6edfd
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 0461c4e..8c3ada3 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
import org.apache.thrift.TException;
/**
@@ -1185,4 +1186,23 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
return objectStore.deleteRuntimeStats(maxRetainSecs);
}
+
+ @Override
+ public List<FullTableName> getTableNamesWithStats() throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }
+
+ @Override
+ public List<FullTableName> getAllTableNamesForStats() throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }
+
+ @Override
+ public Map<String, List<String>> getPartitionColsWithStats(String catName,
+ String dbName, String tableName) throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index b71eda4..f98e8de 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
import org.apache.thrift.TException;
import org.junit.Assert;
@@ -1172,4 +1173,23 @@ public class DummyRawStoreForJdoConnection implements RawStore {
public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
return 0;
}
+
+ @Override
+ public List<FullTableName> getTableNamesWithStats() throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }
+
+ @Override
+ public List<FullTableName> getAllTableNamesForStats() throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }
+
+ @Override
+ public Map<String, List<String>> getPartitionColsWithStats(String catName,
+ String dbName, String tableName) throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java
index fe2d758..f824dbd 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java
@@ -354,7 +354,7 @@ public class TestTablesCreateDropAlterTruncate extends MetaStoreClientTest {
createdTable.getSd().getLocation());
}
- @Test(expected = MetaException.class)
+ @Test(expected = InvalidObjectException.class)
public void testCreateTableNullDatabase() throws Exception {
Table table = testTables[0];
table.setDbName(null);
@@ -889,7 +889,7 @@ public class TestTablesCreateDropAlterTruncate extends MetaStoreClientTest {
}
}
- @Test(expected = MetaException.class)
+ @Test(expected = InvalidOperationException.class)
public void testAlterTableNullDatabaseInNew() throws Exception {
Table originalTable = testTables[0];
Table newTable = originalTable.deepCopy();
[2/2] hive git commit: HIVE-19418 : add background stats updater
similar to compactor (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Posted by se...@apache.org.
HIVE-19418 : add background stats updater similar to compactor (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bdac6edf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bdac6edf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bdac6edf
Branch: refs/heads/branch-3
Commit: bdac6edfd801804fba19dc1f2f59885c9dc743e7
Parents: 43c4bd3
Author: sergey <se...@apache.org>
Authored: Wed Jun 6 12:20:06 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Jun 6 12:23:50 2018 -0700
----------------------------------------------------------------------
.../listener/DummyRawStoreFailEvent.java | 23 +-
.../org/apache/hadoop/hive/ql/DriverUtils.java | 77 +++
.../hadoop/hive/ql/session/SessionState.java | 2 +-
.../hive/ql/stats/StatsUpdaterThread.java | 652 +++++++++++++++++++
.../hive/ql/txn/compactor/CompactorMR.java | 51 +-
.../hive/ql/stats/TestStatsUpdaterThread.java | 472 ++++++++++++++
.../hadoop/hive/metastore/HiveAlterHandler.java | 8 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 11 +
.../hive/metastore/MetaStoreDirectSql.java | 91 +++
.../hadoop/hive/metastore/ObjectStore.java | 99 ++-
.../apache/hadoop/hive/metastore/RawStore.java | 8 +
.../hive/metastore/cache/CachedStore.java | 17 +
.../hive/metastore/conf/EnumValidator.java | 26 +
.../hive/metastore/conf/MetastoreConf.java | 16 +
.../hive/metastore/utils/MetaStoreUtils.java | 38 +-
.../DummyRawStoreControlledCommit.java | 20 +
.../DummyRawStoreForJdoConnection.java | 20 +
.../TestTablesCreateDropAlterTruncate.java | 4 +-
18 files changed, 1578 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 0cc0ae5..ff97522 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -22,11 +22,13 @@ import org.apache.hadoop.hive.metastore.api.ISchemaName;
import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
import org.apache.hadoop.hive.metastore.api.Catalog;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.FileMetadataHandler;
@@ -88,6 +90,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMNullablePool;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
import org.apache.thrift.TException;
/**
@@ -1221,4 +1224,22 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
return objectStore.deleteRuntimeStats(maxRetainSecs);
}
-}
+
+ @Override
+ public List<FullTableName> getTableNamesWithStats() throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }
+
+ @Override
+ public List<FullTableName> getAllTableNamesForStats() throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }
+
+ @Override
+ public Map<String, List<String>> getPartitionColsWithStats(String catName,
+ String dbName, String tableName) throws MetaException,
+ NoSuchObjectException {
+ return null;
+ }}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
new file mode 100644
index 0000000..8228109
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DriverUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(DriverUtils.class);
+
+ public static void runOnDriver(HiveConf conf, String user,
+ SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
+ SessionState.setCurrentSessionState(sessionState);
+ boolean isOk = false;
+ try {
+ QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
+ Driver driver = new Driver(qs, user, null, null);
+ driver.setCompactionWriteIds(writeIds);
+ try {
+ CommandProcessorResponse cpr = driver.run(query);
+ if (cpr.getResponseCode() != 0) {
+ LOG.error("Failed to run " + query, cpr.getException());
+ throw new HiveException("Failed to run " + query, cpr.getException());
+ }
+ } finally {
+ driver.close();
+ driver.destroy();
+ }
+ isOk = true;
+ } finally {
+ if (!isOk) {
+ try {
+ sessionState.close(); // This also resets SessionState.get.
+ } catch (Throwable th) {
+ LOG.warn("Failed to close a bad session", th);
+ SessionState.detachSession();
+ }
+ }
+ }
+ }
+
+ public static SessionState setUpSessionState(HiveConf conf, String user, boolean doStart) {
+ SessionState sessionState = SessionState.get();
+ if (sessionState == null) {
+ // Note: we assume that workers run on the same threads repeatedly, so we can set up
+ // the session here and it will be reused without explicitly storing in the worker.
+ sessionState = new SessionState(conf, user);
+ if (doStart) {
+ // TODO: Required due to SessionState.getHDFSSessionPath. Why wasn't it required before?
+ sessionState.setIsHiveServerQuery(true);
+ SessionState.start(sessionState);
+ }
+ SessionState.setCurrentSessionState(sessionState);
+ }
+ return sessionState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 89129f9..9c52f2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -813,7 +813,7 @@ public class SessionState {
return new Path(sessionPathString);
}
Preconditions.checkNotNull(ss.hdfsSessionPath,
- "Non-local session path expected to be non-null");
+ "Non-local session path expected to be non-null");
return ss.hdfsSessionPath;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
new file mode 100644
index 0000000..285db31
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.stats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.RawStoreProxy;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+public class StatsUpdaterThread extends Thread implements MetaStoreThread {
+ public static final String SKIP_STATS_AUTOUPDATE_PROPERTY = "skip.stats.autoupdate";
+ private static final Logger LOG = LoggerFactory.getLogger(StatsUpdaterThread.class);
+
+ protected Configuration conf;
+ protected int threadId;
+ protected AtomicBoolean stop;
+ protected AtomicBoolean looped;
+
+ private RawStore rs;
+ private TxnStore txnHandler;
+ /** Full tables, and partitions that currently have analyze commands queued or in progress. */
+ private ConcurrentHashMap<FullTableName, Boolean> tablesInProgress = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Boolean> partsInProgress = new ConcurrentHashMap<>();
+ private AtomicInteger itemsInProgress = new AtomicInteger(0);
+
+ // Configuration
+ /** Whether to only update stats that already exist and are out of date. */
+ private boolean isExistingOnly;
+ private long noUpdatesWaitMs;
+ private int batchSize;
+
+ // Worker threads stuff
+ private BlockingQueue<AnalyzeWork> workQueue;
+ private Thread[] workers;
+
+ @Override
+ public void setConf(Configuration conf) {
+ StatsUpdateMode mode = StatsUpdateMode.valueOf(
+ MetastoreConf.getVar(conf, ConfVars.STATS_AUTO_UPDATE).toUpperCase());
+ switch (mode) {
+ case ALL: this.isExistingOnly = false; break;
+ case EXISTING: this.isExistingOnly = true; break;
+ default: throw new AssertionError("Unexpected mode " + mode);
+ }
+ noUpdatesWaitMs = MetastoreConf.getTimeVar(
+ conf, ConfVars.STATS_AUTO_UPDATE_NOOP_WAIT, TimeUnit.MILLISECONDS);
+ batchSize = MetastoreConf.getIntVar(conf, ConfVars.BATCH_RETRIEVE_MAX);
+ int workerCount = MetastoreConf.getIntVar(conf, ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT);
+ if (workerCount <= 0) {
+ workerCount = 1;
+ }
+ workers = new Thread[workerCount];
+ // Don't store too many items; if the queue is full we'll block the checker thread.
+ // Since the worker count determines how many queries can be running in parallel, it makes
+ // no sense to produce more work if the backlog is getting too long.
+ workQueue = new ArrayBlockingQueue<AnalyzeWork>(workerCount * 3);
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setThreadId(int threadId) {
+ this.threadId = threadId;
+ }
+
+ @Override
+ public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException {
+ this.stop = stop;
+ this.looped = looped;
+ setPriority(MIN_PRIORITY);
+ setDaemon(true);
+ String user = "anonymous";
+ try {
+ user = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ LOG.warn("Cannot determine the current user; executing as anonymous", e);
+ }
+ txnHandler = TxnUtils.getTxnStore(conf);
+ rs = RawStoreProxy.getProxy(conf, conf,
+ MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId);
+ for (int i = 0; i < workers.length; ++i) {
+ workers[i] = new Thread(new WorkerRunnable(conf, user));
+ workers[i].setDaemon(true);
+ workers[i].setName("Stats updater worker " + i);
+ }
+ }
+
+ @Override
+ public void run() {
+ startWorkers();
+ while (!stop.get()) {
+ boolean hadUpdates = runOneIteration();
+ try {
+ Thread.sleep(hadUpdates ? 0 : noUpdatesWaitMs);
+ } catch (InterruptedException e) {
+ LOG.info("Stats updater thread was interrupted and will now exit");
+ stopWorkers();
+ return;
+ }
+ }
+ LOG.info("Stats updater thread was stopped and will now exit");
+ }
+
+ @VisibleForTesting
+ void startWorkers() {
+ for (int i = 0; i < workers.length; ++i) {
+ workers[i].start();
+ }
+ }
+
+ @VisibleForTesting
+ boolean runOneIteration() {
+ List<FullTableName> fullTableNames;
+ try {
+ fullTableNames = getTablesToCheck();
+ } catch (Throwable t) {
+ LOG.error("Stats updater thread cannot retrieve tables and will now exit", t);
+ stopWorkers();
+ throw new RuntimeException(t);
+ }
+ LOG.debug("Processing {}", fullTableNames);
+ boolean hadUpdates = false;
+ for (FullTableName fullTableName : fullTableNames) {
+ try {
+ List<AnalyzeWork> commands = processOneTable(fullTableName);
+ hadUpdates = hadUpdates || commands != null;
+ if (commands != null) {
+ for (AnalyzeWork req : commands) {
+ markAnalyzeInProgress(req);
+ workQueue.put(req);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to process " + fullTableName + "; skipping for now", e);
+ }
+ }
+ return hadUpdates;
+ }
+
+ private void stopWorkers() {
+ for (int i = 0; i < workers.length; ++i) {
+ workers[i].interrupt();
+ }
+ }
+
+ private List<AnalyzeWork> processOneTable(FullTableName fullTableName)
+ throws MetaException, NoSuchTxnException, NoSuchObjectException {
+ if (isAnalyzeTableInProgress(fullTableName)) return null;
+ String cat = fullTableName.catalog, db = fullTableName.db, tbl = fullTableName.table;
+ Table table = rs.getTable(cat, db, tbl);
+ LOG.debug("Processing table {}", table);
+
+ // Check if the table should be skipped.
+ String skipParam = table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY);
+ if ("true".equalsIgnoreCase(skipParam)) return null;
+
+ // TODO: when txn stats are implemented, use writeIds to determine stats accuracy
+ @SuppressWarnings("unused")
+ ValidReaderWriteIdList writeIds = null;
+ if (AcidUtils.isTransactionalTable(table)) {
+ writeIds = getWriteIds(fullTableName);
+ }
+ List<String> allCols = new ArrayList<>(table.getSd().getColsSize());
+ for (FieldSchema fs : table.getSd().getCols()) {
+ allCols.add(fs.getName());
+ }
+ Collections.sort(allCols);
+ if (table.getPartitionKeysSize() == 0) {
+ Map<String, String> params = table.getParameters();
+ List<String> colsToUpdate = isExistingOnly
+ ? getExistingNonPartTableStatsToUpdate(fullTableName, cat, db, tbl, params, allCols)
+ : getAnyStatsToUpdate(allCols, params);
+ LOG.debug("Columns to update are {}; existing only: {}, out of: {} based on {}",
+ colsToUpdate, isExistingOnly, allCols, params);
+
+ if (colsToUpdate == null || colsToUpdate.isEmpty()) {
+ return null; // No update necessary.
+ }
+ return Lists.newArrayList(new AnalyzeWork(fullTableName,
+ null, null, allCols.size() == colsToUpdate.size() ? null : colsToUpdate));
+ } else {
+ Map<String, List<String>> partsToAnalyze = new HashMap<>();
+ List<String> colsForAllParts = findPartitionsToAnalyze(
+ fullTableName, cat, db, tbl, allCols, partsToAnalyze);
+ LOG.debug("Columns to update are {} for all partitions; {} individual partitions."
+ + " Existing only: {}, out of: {}", colsForAllParts, partsToAnalyze.size(),
+ isExistingOnly, allCols);
+ if (colsForAllParts == null && partsToAnalyze.isEmpty()) {
+ return null; // No partitions need update.
+ }
+ if (colsForAllParts != null) {
+ // We can update all partitions with a single analyze query.
+ return Lists.newArrayList(new AnalyzeWork(
+ fullTableName, null, buildPartColStr(table), colsForAllParts));
+ }
+ List<AnalyzeWork> result = new ArrayList<>(partsToAnalyze.size());
+ for (Map.Entry<String, List<String>> e : partsToAnalyze.entrySet()) {
+ LOG.debug("Adding analyze work for {}", e.getKey());
+ result.add(new AnalyzeWork(fullTableName, e.getKey(), null, e.getValue()));
+ }
+ return result;
+ }
+ }
+
+ private List<String> findPartitionsToAnalyze(FullTableName fullTableName, String cat, String db,
+ String tbl, List<String> allCols, Map<String, List<String>> partsToAnalyze)
+ throws MetaException, NoSuchObjectException {
+ // TODO: ideally when col-stats-accurate stuff is stored in some sane structure, this should
+ // to retrieve partsToUpdate in a single query; no checking partition params in java.
+ List<String> partNames = null;
+ Map<String, List<String>> colsPerPartition = null;
+ boolean isAllParts = true;
+ if (isExistingOnly) {
+ colsPerPartition = rs.getPartitionColsWithStats(cat, db, tbl);
+ partNames = Lists.newArrayList(colsPerPartition.keySet());
+ int partitionCount = rs.getNumPartitionsByFilter(cat, db, tbl, "");
+ isAllParts = partitionCount == partNames.size();
+ } else {
+ partNames = rs.listPartitionNames(cat, db, tbl, (short) -1);
+ isAllParts = true;
+ }
+ Table t = rs.getTable(cat, db, tbl);
+ List<Partition> currentBatch = null;
+ int nextBatchStart = 0, nextIxInBatch = -1, currentBatchStart = 0;
+ List<String> colsToUpdateForAll = null;
+ while (true) {
+ if (currentBatch == null || nextIxInBatch == currentBatch.size()) {
+ if (nextBatchStart >= partNames.size()) {
+ break;
+ }
+ int nextBatchEnd = Math.min(partNames.size(), nextBatchStart + this.batchSize);
+ List<String> currentNames = partNames.subList(nextBatchStart, nextBatchEnd);
+ currentBatchStart = nextBatchStart;
+ nextBatchStart = nextBatchEnd;
+ try {
+ currentBatch = rs.getPartitionsByNames(cat, db, tbl, currentNames);
+ } catch (NoSuchObjectException e) {
+ LOG.error("Failed to get partitions for " + fullTableName + ", skipping some partitions", e);
+ currentBatch = null;
+ continue;
+ }
+ nextIxInBatch = 0;
+ }
+ int currentIxInBatch = nextIxInBatch++;
+ Partition part = currentBatch.get(currentIxInBatch);
+ String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+ LOG.debug("Processing partition ({} in batch), {}", currentIxInBatch, partName);
+
+ // Skip the partitions in progress, and the ones for which stats update is disabled.
+ // We could filter the skipped partititons out as part of the initial names query,
+ // but we assume it's extremely rare for individual partitions.
+ Map<String, String> params = part.getParameters();
+ String skipParam = params.get(SKIP_STATS_AUTOUPDATE_PROPERTY);
+ if (isAnalyzePartInProgress(fullTableName, partName) || "true".equalsIgnoreCase(skipParam)) {
+ if (isAllParts) {
+ addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch,
+ colsToUpdateForAll, partsToAnalyze);
+ }
+ isAllParts = false;
+ continue;
+ }
+
+ // Find which columns we need to update for this partition, if any.
+ List<String> colsToMaybeUpdate = allCols;
+ if (isExistingOnly) {
+ colsToMaybeUpdate = colsPerPartition.get(partName);
+ Collections.sort(colsToMaybeUpdate);
+ }
+ List<String> colsToUpdate = getAnyStatsToUpdate(colsToMaybeUpdate, params);
+ LOG.debug("Updating {} based on {} and {}", colsToUpdate, colsToMaybeUpdate, params);
+
+
+ if (colsToUpdate == null || colsToUpdate.isEmpty()) {
+ if (isAllParts) {
+ addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch,
+ colsToUpdateForAll, partsToAnalyze);
+ }
+ isAllParts = false;
+ continue;
+ }
+
+ // If issuing a query for all partitions, verify that we need update the same columns.
+ // TODO: for non columnar we don't need to do this... might as well update all stats.
+ if (isAllParts) {
+ List<String> newCols = verifySameColumnsForAllParts(colsToUpdateForAll, colsToUpdate);
+ if (newCols == null) {
+ isAllParts = false;
+ addPreviousPartitions(t, partNames, currentBatchStart, currentBatch, currentIxInBatch,
+ colsToUpdateForAll, partsToAnalyze);
+ } else if (colsToUpdateForAll == null) {
+ colsToUpdateForAll = newCols;
+ }
+ }
+
+ if (!isAllParts) {
+ LOG.trace("Adding {}, {}", partName, colsToUpdate);
+ partsToAnalyze.put(partName, colsToUpdate);
+ }
+ }
+ return isAllParts ? colsToUpdateForAll : null;
+ }
+
+ private List<String> verifySameColumnsForAllParts(
+ List<String> colsToUpdateForAll, List<String> colsToUpdate) {
+ if (colsToUpdateForAll == null) {
+ return colsToUpdate;
+ }
+ if (colsToUpdate.size() != colsToUpdateForAll.size()) {
+ return null;
+ }
+ // Assumes the lists are sorted.
+ for (int i = 0; i < colsToUpdateForAll.size(); ++i) {
+ if (!colsToUpdate.get(i).equals(colsToUpdateForAll.get(i))) {
+ return null;
+ }
+ }
+ return colsToUpdateForAll;
+ }
+
+ private void addPreviousPartitions(Table t, List<String> allPartNames,
+ int currentBatchStart, List<Partition> currentBatch, int currentIxInBatch,
+ List<String> cols, Map<String, List<String>> partsToAnalyze) throws MetaException {
+ // Add all the names for previous batches.
+ for (int i = 0; i < currentBatchStart; ++i) {
+ LOG.trace("Adding previous {}, {}", allPartNames.get(i), cols);
+ partsToAnalyze.put(allPartNames.get(i), cols);
+ }
+ // Current match may be out of order w.r.t. the global name list, so add specific parts.
+ for (int i = 0; i < currentIxInBatch; ++i) {
+ String name = Warehouse.makePartName(t.getPartitionKeys(), currentBatch.get(i).getValues());
+ LOG.trace("Adding previous {}, {}", name, cols);
+ partsToAnalyze.put(name, cols);
+ }
+ }
+
+ private String buildPartColStr(Table table) {
+ String partColStr = "";
+ for (int i = 0; i < table.getPartitionKeysSize(); ++i) {
+ if (i != 0) {
+ partColStr += ",";
+ }
+ partColStr += table.getPartitionKeys().get(i).getName();
+ }
+ return partColStr;
+ }
+
+ private List<String> getExistingNonPartTableStatsToUpdate(FullTableName fullTableName,
+ String cat, String db, String tbl, Map<String, String> params,
+ List<String> allCols) throws MetaException {
+ ColumnStatistics existingStats = null;
+ try {
+ existingStats = rs.getTableColumnStatistics(cat, db, tbl, allCols);
+ } catch (NoSuchObjectException e) {
+ LOG.error("Cannot retrieve existing stats, skipping " + fullTableName, e);
+ return null;
+ }
+ return getExistingStatsToUpdate(existingStats, params);
+ }
+
+ private List<String> getExistingStatsToUpdate(
+ ColumnStatistics existingStats, Map<String, String> params) {
+ boolean hasAnyAccurate = StatsSetupConst.areBasicStatsUptoDate(params);
+ List<String> colsToUpdate = new ArrayList<>();
+ for (ColumnStatisticsObj obj : existingStats.getStatsObj()) {
+ String col = obj.getColName();
+ if (!hasAnyAccurate || !StatsSetupConst.areColumnStatsUptoDate(params, col)) {
+ colsToUpdate.add(col);
+ }
+ }
+ return colsToUpdate;
+ }
+
+ private List<String> getAnyStatsToUpdate(
+ List<String> allCols, Map<String, String> params) {
+ // Note: we only run "for columns" command and assume no basic stats means no col stats.
+ if (!StatsSetupConst.areBasicStatsUptoDate(params)) {
+ return allCols;
+ }
+ List<String> colsToUpdate = new ArrayList<>();
+ for (String col : allCols) {
+ if (!StatsSetupConst.areColumnStatsUptoDate(params, col)) {
+ colsToUpdate.add(col);
+ }
+ }
+ return colsToUpdate;
+ }
+
+ private List<FullTableName> getTablesToCheck() throws MetaException, NoSuchObjectException {
+ if (isExistingOnly) {
+ try {
+ return rs.getTableNamesWithStats();
+ } catch (Exception ex) {
+ LOG.error("Error from getTablesWithStats, getting all the tables", ex);
+ }
+ }
+ return rs.getAllTableNamesForStats();
+ }
+
+ private ValidReaderWriteIdList getWriteIds(
+ FullTableName fullTableName) throws NoSuchTxnException, MetaException {
+ GetValidWriteIdsRequest req = new GetValidWriteIdsRequest();
+ req.setFullTableNames(Lists.newArrayList(fullTableName.toString()));
+ return TxnUtils.createValidReaderWriteIdList(
+ txnHandler.getValidWriteIds(req).getTblValidWriteIds().get(0));
+ }
+
+
+ private void markAnalyzeInProgress(AnalyzeWork req) {
+ if (req.partName == null) {
+ Boolean old = tablesInProgress.putIfAbsent(req.tableName, true);
+ if (old != null) {
+ throw new AssertionError("The table was added to progress twice: " + req.tableName);
+ }
+ } else {
+ String partName = req.makeFullPartName();
+ Boolean old = partsInProgress.putIfAbsent(partName, true);
+ if (old != null) {
+ throw new AssertionError("The partition was added to progress twice: " + partName);
+ }
+ }
+ itemsInProgress.incrementAndGet();
+ }
+
+ private void markAnalyzeDone(AnalyzeWork req) {
+ if (req.partName == null) {
+ Boolean old = tablesInProgress.remove(req.tableName);
+ if (old == null) {
+ throw new AssertionError("The table was not in progress: " + req.tableName);
+ }
+ } else {
+ String partName = req.makeFullPartName();
+ Boolean old = partsInProgress.remove(partName);
+ if (old == null) {
+ throw new AssertionError("Partition was not in progress: " + partName);
+ }
+ }
+ // This is used for tests where there's always just one batch of work and we do the
+ // checks after the batch, so the check will only come at the end of queueing.
+ int remaining = itemsInProgress.decrementAndGet();
+ if (remaining == 0) {
+ synchronized (itemsInProgress) {
+ itemsInProgress.notifyAll();
+ }
+ }
+ }
+
+ private boolean isAnalyzeTableInProgress(FullTableName fullTableName) {
+ return tablesInProgress.containsKey(fullTableName);
+ }
+
+ private boolean isAnalyzePartInProgress(FullTableName tableName, String partName) {
+ return partsInProgress.containsKey(makeFullPartName(tableName, partName));
+ }
+
+ private static String makeFullPartName(FullTableName tableName, String partName) {
+ return tableName + "/" + partName;
+ }
+
+ private final static class AnalyzeWork {
+ FullTableName tableName;
+ String partName, allParts;
+ List<String> cols;
+
+ public AnalyzeWork(FullTableName tableName, String partName, String allParts, List<String> cols) {
+ this.tableName = tableName;
+ this.partName = partName;
+ this.allParts = allParts;
+ this.cols = cols;
+ }
+
+ public String makeFullPartName() {
+ return StatsUpdaterThread.makeFullPartName(tableName, partName);
+ }
+
+ public String buildCommand() {
+ // Catalogs cannot be parsed as part of the query. Seems to be a bug.
+ String cmd = "analyze table " + tableName.db + "." + tableName.table;
+ assert partName == null || allParts == null;
+ if (partName != null) {
+ cmd += " partition(" + partName + ")";
+ }
+ if (allParts != null) {
+ cmd += " partition(" + allParts + ")";
+ }
+ cmd += " compute statistics for columns";
+ if (cols != null) {
+ cmd += " " + String.join(",", cols);
+ }
+ return cmd;
+ }
+
+ @Override
+ public String toString() {
+ return "AnalyzeWork [tableName=" + tableName + ", partName=" + partName
+ + ", allParts=" + allParts + ", cols=" + cols + "]";
+ }
+ }
+
+ @VisibleForTesting
+ public boolean runOneWorkerIteration(
+ SessionState ss, String user, HiveConf conf, boolean doWait) throws InterruptedException {
+ AnalyzeWork req;
+ if (doWait) {
+ req = workQueue.take();
+ } else {
+ req = workQueue.poll();
+ if (req == null) {
+ return false;
+ }
+ }
+ String cmd = null;
+ try {
+ cmd = req.buildCommand();
+ LOG.debug("Running {} based on {}", cmd, req);
+ if (doWait) {
+ SessionState.start(ss); // This is the first call, open the session
+ }
+ DriverUtils.runOnDriver(conf, user, ss, cmd, null);
+ } catch (Exception e) {
+ LOG.error("Analyze command failed: " + cmd, e);
+ try {
+ ss.close();
+ } catch (IOException e1) {
+ LOG.warn("Failed to close a bad session", e1);
+ } finally {
+ SessionState.detachSession();
+ }
+ } finally {
+ markAnalyzeDone(req);
+ }
+ return true;
+ }
+
+ public class WorkerRunnable implements Runnable {
+ private final HiveConf conf;
+ private final String user;
+
+ public WorkerRunnable(Configuration conf, String user) {
+ this.conf = new HiveConf(conf, HiveConf.class);
+ this.user = user;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ // This should not start the actual Tez AM.
+ SessionState ss = DriverUtils.setUpSessionState(conf, user, false);
+ // Wait for the first item to arrive at the queue and process it.
+ try {
+ runOneWorkerIteration(ss, user, conf, true);
+ } catch (InterruptedException e) {
+ closeSession(ss);
+ LOG.info("Worker thread was interrupted and will now exit");
+ return;
+ }
+ // Keep draining the queue in the same session.
+ try {
+ while (runOneWorkerIteration(ss, user, conf, false)) {}
+ } catch (InterruptedException e) {
+ closeSession(ss);
+ LOG.info("Worker thread was interrupted unexpectedly and will now exit");
+ return;
+ };
+ // Close the session before we have to wait again.
+ closeSession(ss);
+ SessionState.detachSession();
+ }
+ }
+ }
+
+ private static void closeSession(SessionState ss) {
+ try {
+ ss.close();
+ } catch (IOException e1) {
+ LOG.error("Failed to close the session", e1);
+ }
+ }
+
+ @VisibleForTesting
+ public void waitForQueuedCommands() throws InterruptedException {
+ while (itemsInProgress.get() > 0) {
+ synchronized (itemsInProgress) {
+ itemsInProgress.wait(100L);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public int getQueueLength() {
+ return workQueue.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 982b180..6044719 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -353,7 +354,7 @@ public class CompactorMR {
conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
String user = UserGroupInformation.getCurrentUser().getShortUserName();
- SessionState sessionState = setUpSessionState(conf, user);
+ SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false);
// Note: we could skip creating the table and just add table type stuff directly to the
// "insert overwrite directory" command if there were no bucketing or list bucketing.
@@ -365,7 +366,7 @@ public class CompactorMR {
p == null ? t.getSd() : p.getSd(), baseLocation.toString());
LOG.info("Compacting a MM table into " + query);
try {
- runOnDriver(conf, user, sessionState, query, null);
+ DriverUtils.runOnDriver(conf, user, sessionState, query, null);
break;
} catch (Exception ex) {
Throwable cause = ex;
@@ -380,26 +381,16 @@ public class CompactorMR {
String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
LOG.info("Compacting a MM table via " + query);
- runOnDriver(conf, user, sessionState, query, writeIds);
+ DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds);
commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds);
- runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName, null);
+ DriverUtils.runOnDriver(conf, user, sessionState,
+ "drop table if exists " + tmpTableName, null);
} catch (HiveException e) {
LOG.error("Error compacting a MM table", e);
throw new IOException(e);
}
}
- public SessionState setUpSessionState(HiveConf conf, String user) {
- SessionState sessionState = SessionState.get();
- if (sessionState == null) {
- // Note: we assume that workers run on the same threads repeatedly, so we can set up
- // the session here and it will be reused without explicitly storing in the worker.
- sessionState = new SessionState(conf, user);
- SessionState.setCurrentSessionState(sessionState);
- }
- return sessionState;
- }
-
private String generateTmpPath(StorageDescriptor sd) {
return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
}
@@ -514,36 +505,6 @@ public class CompactorMR {
return result;
}
- private void runOnDriver(HiveConf conf, String user,
- SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
- boolean isOk = false;
- try {
- QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
- Driver driver = new Driver(qs, user, null, null);
- driver.setCompactionWriteIds(writeIds);
- try {
- CommandProcessorResponse cpr = driver.run(query);
- if (cpr.getResponseCode() != 0) {
- LOG.error("Failed to run " + query, cpr.getException());
- throw new HiveException("Failed to run " + query, cpr.getException());
- }
- } finally {
- driver.close();
- driver.destroy();
- }
- isOk = true;
- } finally {
- if (!isOk) {
- try {
- sessionState.close(); // This also resets SessionState.get.
- } catch (Throwable th) {
- LOG.warn("Failed to close a bad session", th);
- SessionState.detachSession();
- }
- }
- }
- }
-
private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
String fullName = t.getDbName() + "." + t.getTableName();
// TODO: ideally we should make a special form of insert overwrite so that we:
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
new file mode 100644
index 0000000..d0b41f3
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestStatsUpdaterThread {
+ @SuppressWarnings("unused")
+ static final private Logger LOG = LoggerFactory.getLogger(TestStatsUpdaterThread.class);
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestStatsUpdaterThread.class.getCanonicalName()
+ + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+ private HiveConf hiveConf;
+ private SessionState ss;
+
+ String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Before
+ public void setUp() throws Exception {
+ this.hiveConf = new HiveConf(TestStatsUpdaterThread.class);
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getTestDataDir());
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+// hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true);
+ hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "all");
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.prepDb(hiveConf);
+ File f = new File(getTestDataDir());
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(getTestDataDir()).mkdirs())) {
+ throw new RuntimeException("Could not create " + getTestDataDir());
+ }
+ this.ss = DriverUtils.setUpSessionState(hiveConf, "hive", true);
+ cleanUp();
+ }
+
+ @After
+ public void cleanUp() throws HiveException {
+ executeQuery("drop table simple_stats");
+ executeQuery("drop table simple_stats2");
+ executeQuery("drop table simple_stats3");
+ }
+
+ @Test(timeout=20000)
+ public void testSimpleUpdateWithThreads() throws Exception {
+ StatsUpdaterThread su = createUpdater();
+ su.startWorkers();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+
+ executeQuery("create table simple_stats (i int, s string)");
+ executeQuery("insert into simple_stats (i, s) values (1, 'test')");
+ verifyAndUnsetColStats("simple_stats", Lists.newArrayList("i"), msClient);
+
+ assertTrue(su.runOneIteration());
+ su.waitForQueuedCommands();
+ verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, true);
+
+ msClient.close();
+ }
+
+ @Test(timeout=20000)
+ public void testMultipleTables() throws Exception {
+ StatsUpdaterThread su = createUpdater();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+
+ executeQuery("create table simple_stats (s string)");
+ executeQuery("insert into simple_stats (s) values ('test')");
+ executeQuery("create table simple_stats2 (s string)");
+ executeQuery("insert into simple_stats2 (s) values ('test2')");
+ verifyAndUnsetColStats("simple_stats", Lists.newArrayList("s"), msClient);
+ verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient);
+
+ assertTrue(su.runOneIteration());
+ drainWorkQueue(su);
+ verifyAndUnsetColStats("simple_stats", Lists.newArrayList("s"), msClient);
+ verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient);
+
+ setTableSkipProperty(msClient, "simple_stats", "true");
+ assertTrue(su.runOneIteration());
+ drainWorkQueue(su);
+ verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, false);
+ verifyAndUnsetColStats("simple_stats2", Lists.newArrayList("s"), msClient);
+
+ msClient.close();
+ }
+
+ @Test(timeout=20000)
+ public void testExistingOnly() throws Exception {
+ hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "existing");
+ StatsUpdaterThread su = createUpdater();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+
+ executeQuery("create table simple_stats (i int, s string)");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ executeQuery("insert into simple_stats (i, s) values (1, 'test')");
+ executeQuery("analyze table simple_stats compute statistics for columns i");
+ verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false);
+ verifyAndUnsetColStats("simple_stats", Lists.newArrayList("i"), msClient);
+
+ assertTrue(su.runOneIteration());
+ drainWorkQueue(su);
+ verifyStatsUpToDate("simple_stats", Lists.newArrayList("i"), msClient, true);
+ verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false);
+
+ msClient.close();
+ }
+
+ @Test(timeout=60000)
+ public void testQueueingWithThreads() throws Exception {
+ final int PART_COUNT = 12;
+ hiveConf.setInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 5);
+ hiveConf.setInt(MetastoreConf.ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT.getVarname(), 2);
+ StatsUpdaterThread su = createUpdater();
+ su.startWorkers();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ executeQuery("create table simple_stats (s string) partitioned by (i int)");
+ for (int i = 0; i < PART_COUNT; ++i) {
+ executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')");
+ }
+ verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", false);
+
+ // Set one of the partitions to be skipped, so that a command is created for every other one.
+ setPartitionSkipProperty(msClient, "simple_stats", "i=0", "true");
+
+
+ assertTrue(su.runOneIteration());
+ su.waitForQueuedCommands();
+ verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s"), msClient, false);
+ verifyPartStatsUpToDate(PART_COUNT, 1, msClient, "simple_stats", true);
+
+ assertFalse(su.runOneIteration());
+ drainWorkQueue(su, 0); // Nothing else is updated after the first update.
+
+ msClient.close();
+ }
+
+ @Test(timeout=20000)
+ public void testAllPartitions() throws Exception {
+ final int PART_COUNT = 3;
+ StatsUpdaterThread su = createUpdater();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ executeQuery("create table simple_stats (s string) partitioned by (i int)");
+ for (int i = 0; i < PART_COUNT; ++i) {
+ executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')");
+ }
+ verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", false);
+
+ assertTrue(su.runOneIteration());
+ drainWorkQueue(su, 1); // All the partitions need to be updated; a single command can be used.
+ verifyPartStatsUpToDate(PART_COUNT, 0, msClient, "simple_stats", true);
+
+ assertFalse(su.runOneIteration());
+ drainWorkQueue(su, 0); // Nothing else is updated after the first update.
+
+ msClient.close();
+ }
+
+ @Test(timeout=20000)
+ public void testPartitionSubset() throws Exception {
+ final int NONSTAT_PART_COUNT = 3;
+ StatsUpdaterThread su = createUpdater();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ executeQuery("create table simple_stats (s string) partitioned by (i int)");
+ for (int i = 0; i < NONSTAT_PART_COUNT; ++i) {
+ executeQuery("insert into simple_stats partition(i='" + i + "') values ('test')");
+ }
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true);
+ executeQuery("insert into simple_stats partition(i='"
+ + NONSTAT_PART_COUNT + "') values ('test')");
+ verifyPartStatsUpToDate(NONSTAT_PART_COUNT, 0, msClient, "simple_stats", false);
+ verifyStatsUpToDate("simple_stats",
+ "i=" + NONSTAT_PART_COUNT, Lists.newArrayList("s"), msClient, true);
+
+ final int EXCLUDED_PART = 1;
+ setPartitionSkipProperty(msClient, "simple_stats", "i=" + EXCLUDED_PART, "true");
+
+ assertTrue(su.runOneIteration());
+ // 1 is excluded via property, 3 already has stats, so we only expect two updates.
+ drainWorkQueue(su, NONSTAT_PART_COUNT - 1);
+ for (int i = 0; i < NONSTAT_PART_COUNT; ++i) {
+ verifyStatsUpToDate("simple_stats",
+ "i=" + i, Lists.newArrayList("s"), msClient, i != EXCLUDED_PART);
+ }
+ verifyStatsUpToDate("simple_stats", "i=" + EXCLUDED_PART,
+ Lists.newArrayList("s"), msClient, false);
+
+ msClient.close();
+ }
+
+ @Test(timeout=20000)
+ public void testPartitionsWithDifferentColsAll() throws Exception {
+ StatsUpdaterThread su = createUpdater();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ executeQuery("create table simple_stats (s string, t string, u string) partitioned by (i int)");
+ executeQuery("insert into simple_stats partition(i=0) values ('test', '0', 'foo')");
+ executeQuery("insert into simple_stats partition(i=1) values ('test', '1', 'bar')");
+ executeQuery("analyze table simple_stats partition(i=0) compute statistics for columns s");
+ executeQuery("analyze table simple_stats partition(i=1) compute statistics for columns s, u");
+ verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("t", "u"), msClient, false);
+ verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "u"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("t"), msClient, false);
+
+ assertTrue(su.runOneIteration());
+ // Different columns means different commands have to be run.
+ drainWorkQueue(su, 2);
+ verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t", "u"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true);
+
+ assertFalse(su.runOneIteration());
+ drainWorkQueue(su, 0); // Nothing else is updated after the first update.
+
+ msClient.close();
+ }
+
+
+ @Test(timeout=20000)
+ public void testPartitionsWithDifferentColsExistingOnly() throws Exception {
+ hiveConf.set(MetastoreConf.ConfVars.STATS_AUTO_UPDATE.getVarname(), "existing");
+ StatsUpdaterThread su = createUpdater();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ executeQuery("create table simple_stats (s string, t string, u string) partitioned by (i int)");
+ executeQuery("insert into simple_stats partition(i=0) values ('test', '0', 'foo')");
+ executeQuery("insert into simple_stats partition(i=1) values ('test', '1', 'bar')");
+ executeQuery("insert into simple_stats partition(i=2) values ('test', '2', 'baz')");
+ executeQuery("analyze table simple_stats partition(i=0) compute statistics for columns s, t");
+ executeQuery("analyze table simple_stats partition(i=1) compute statistics for columns");
+ executeQuery("analyze table simple_stats partition(i=2) compute statistics for columns s");
+ verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("u"), msClient, false);
+ verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("s"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("u", "t"), msClient, false);
+
+ // We will unset s on i=0, and t on i=1. Only these should be updated; and nothing for 2.
+ verifyAndUnsetColStats("simple_stats", "i=0", Lists.newArrayList("s"), msClient);
+ verifyAndUnsetColStats("simple_stats", "i=1", Lists.newArrayList("t"), msClient);
+
+ assertTrue(su.runOneIteration());
+ drainWorkQueue(su, 2);
+ // Exact same state as above.
+ verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("s", "t"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=0", Lists.newArrayList("u"), msClient, false);
+ verifyStatsUpToDate("simple_stats", "i=1", Lists.newArrayList("s", "t", "u"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("s"), msClient, true);
+ verifyStatsUpToDate("simple_stats", "i=2", Lists.newArrayList("u", "t"), msClient, false);
+
+ msClient.close();
+ }
+
+ @Test(timeout=20000)
+ public void testParallelOps() throws Exception {
+ // Set high worker count so we get a longer queue.
+ hiveConf.setInt(MetastoreConf.ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT.getVarname(), 4);
+ StatsUpdaterThread su = createUpdater();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ executeQuery("create table simple_stats (s string)");
+ executeQuery("create table simple_stats2 (s string) partitioned by (i int)");
+ executeQuery("create table simple_stats3 (s string) partitioned by (i int)");
+ executeQuery("insert into simple_stats values ('test')");
+ executeQuery("insert into simple_stats2 partition(i=0) values ('test')");
+ executeQuery("insert into simple_stats3 partition(i=0) values ('test')");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, true);
+ executeQuery("insert into simple_stats3 partition(i=1) values ('test')");
+
+ assertTrue(su.runOneIteration());
+ assertEquals(3, su.getQueueLength());
+ // Nothing updated yet.
+ verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, false);
+ verifyPartStatsUpToDate(1, 0, msClient, "simple_stats2", false);
+ verifyStatsUpToDate("simple_stats3", "i=0", Lists.newArrayList("s"), msClient, false);
+ verifyStatsUpToDate("simple_stats3", "i=1", Lists.newArrayList("s"), msClient, true);
+
+ assertFalse(su.runOneIteration());
+ assertEquals(3, su.getQueueLength()); // Nothing new added to the queue while analyze runs.
+
+ // Add another partition without stats.
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ executeQuery("insert into simple_stats3 partition(i=2) values ('test')");
+
+ assertTrue(su.runOneIteration());
+ assertEquals(4, su.getQueueLength()); // An item for new partition is queued now.
+
+ drainWorkQueue(su, 4);
+
+ verifyStatsUpToDate("simple_stats", Lists.newArrayList("s"), msClient, true);
+ verifyPartStatsUpToDate(1, 0, msClient, "simple_stats2", true);
+ verifyPartStatsUpToDate(3, 0, msClient, "simple_stats3", true);
+
+ assertFalse(su.runOneIteration());
+ drainWorkQueue(su, 0); // Nothing else is updated after the first update.
+
+ msClient.close();
+ }
+
+ private void verifyPartStatsUpToDate(int partCount, int skip,
+ IMetaStoreClient msClient, String tbl, boolean isUpToDate) throws Exception {
+ for (int i = skip; i < partCount; ++i) {
+ verifyStatsUpToDate(tbl, "i=" + i, Lists.newArrayList("s"), msClient, isUpToDate);
+ }
+ }
+
+ private void drainWorkQueue(StatsUpdaterThread su) throws InterruptedException {
+ while (su.runOneWorkerIteration(ss, ss.getUserName(), ss.getConf(), false)) {}
+ }
+
+ private void drainWorkQueue(StatsUpdaterThread su, int expectedReqs) throws InterruptedException {
+ int actualReqs = 0;
+ while (su.runOneWorkerIteration(ss, ss.getUserName(), ss.getConf(), false)) {
+ ++actualReqs;
+ }
+ assertEquals(expectedReqs, actualReqs);
+ }
+
+ private void setTableSkipProperty(
+ IMetaStoreClient msClient, String tbl, String val) throws Exception {
+ Table table = msClient.getTable(ss.getCurrentDatabase(), tbl);
+ table.getParameters().put(StatsUpdaterThread.SKIP_STATS_AUTOUPDATE_PROPERTY, val);
+ msClient.alter_table(table.getDbName(), table.getTableName(), table);
+ }
+
+ private void setPartitionSkipProperty(
+ IMetaStoreClient msClient, String tblName, String partName, String val) throws Exception {
+ Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName);
+ part.getParameters().put(StatsUpdaterThread.SKIP_STATS_AUTOUPDATE_PROPERTY, val);
+ msClient.alter_partition(part.getCatName(), part.getDbName(), tblName, part);
+ }
+
+ private void verifyAndUnsetColStats(
+ String tblName, List<String> cols, IMetaStoreClient msClient) throws Exception {
+ Table tbl = msClient.getTable(ss.getCurrentDatabase(), tblName);
+ verifyAndUnsetColStatsVal(tbl.getParameters(), cols);
+ EnvironmentContext ec = new EnvironmentContext();
+ // Make sure metastore doesn't mess with our bogus stats updates.
+ ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+ msClient.alter_table_with_environmentContext(tbl.getDbName(), tbl.getTableName(), tbl, ec);
+ // Double-check.
+ tbl = msClient.getTable(ss.getCurrentDatabase(), tblName);
+ for (String col : cols) {
+ assertFalse(StatsSetupConst.areColumnStatsUptoDate(tbl.getParameters(), col));
+ }
+ }
+
+ private void verifyAndUnsetColStatsVal(Map<String, String> params, List<String> cols) {
+ assertTrue(StatsSetupConst.areBasicStatsUptoDate(params));
+ for (String col : cols) {
+ assertTrue(StatsSetupConst.areColumnStatsUptoDate(params, col));
+ }
+ StatsSetupConst.removeColumnStatsState(params, cols);
+ StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE);
+ }
+
+ private void verifyAndUnsetColStats(String tblName, String partName, List<String> cols,
+ IMetaStoreClient msClient) throws Exception {
+ Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName);
+ verifyAndUnsetColStatsVal(part.getParameters(), cols);
+ EnvironmentContext ec = new EnvironmentContext();
+ // Make sure metastore doesn't mess with our bogus stats updates.
+ ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+ msClient.alter_partition(part.getCatName(), part.getDbName(), tblName, part, ec);
+ // Double-check.
+ part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName);
+ for (String col : cols) {
+ assertFalse(StatsSetupConst.areColumnStatsUptoDate(part.getParameters(), col));
+ }
+ }
+
+ private void verifyStatsUpToDate(String tbl, ArrayList<String> cols, IMetaStoreClient msClient,
+ boolean isUpToDate) throws Exception {
+ Table table = msClient.getTable(ss.getCurrentDatabase(), tbl);
+ verifyStatsUpToDate(table.getParameters(), cols, isUpToDate);
+ }
+
+ private void verifyStatsUpToDate(Map<String, String> params, ArrayList<String> cols,
+ boolean isUpToDate) {
+ if (isUpToDate) {
+ assertTrue(StatsSetupConst.areBasicStatsUptoDate(params));
+ }
+ for (String col : cols) {
+ assertEquals(isUpToDate, StatsSetupConst.areColumnStatsUptoDate(params, col));
+ }
+ }
+
+ private void verifyStatsUpToDate(String tbl, String part, ArrayList<String> cols,
+ IMetaStoreClient msClient, boolean isUpToDate) throws Exception {
+ Partition partition = msClient.getPartition(ss.getCurrentDatabase(), tbl, part);
+ verifyStatsUpToDate(partition.getParameters(), cols, isUpToDate);
+ }
+
+ private void executeQuery(String query) throws HiveException {
+ DriverUtils.runOnDriver(hiveConf, ss.getUserName(), ss, query, null);
+ }
+
+ private StatsUpdaterThread createUpdater() throws MetaException {
+ StatsUpdaterThread su = new StatsUpdaterThread();
+ su.setConf(hiveConf);
+ su.init(new AtomicBoolean(false), null);
+ return su;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index be05838..ed53c90 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -310,6 +310,7 @@ public class HiveAlterHandler implements AlterHandler {
}
} else {
// operations other than table rename
+
if (MetaStoreUtils.requireCalStats(null, null, newt, environmentContext) &&
!isPartitionedTable) {
Database db = msdb.getDatabase(catName, newDbName);
@@ -540,10 +541,11 @@ public class HiveAlterHandler implements AlterHandler {
}
success = msdb.commitTransaction();
} catch (InvalidObjectException e) {
- throw new InvalidOperationException("alter is not possible");
- } catch (NoSuchObjectException e){
+ LOG.warn("Alter failed", e);
+ throw new InvalidOperationException("alter is not possible: " + e.getMessage());
+ } catch (NoSuchObjectException e) {
//old partition does not exist
- throw new InvalidOperationException("alter is not possible");
+ throw new InvalidOperationException("alter is not possible: " + e.getMessage());
} finally {
if(!success) {
msdb.rollbackTransaction();
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index d37e705..dd45cdb 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
@@ -9053,6 +9054,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
t.start();
}
+
/**
* Start threads outside of the thrift service, such as the compactor threads.
* @param conf Hive configuration object
@@ -9092,6 +9094,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
startCompactorWorkers(conf);
startCompactorCleaner(conf);
startRemoteOnlyTasks(conf);
+ startStatsUpdater(conf);
} catch (Throwable e) {
LOG.error("Failure when starting the compactor, compactions may not happen, " +
StringUtils.stringifyException(e));
@@ -9107,6 +9110,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
t.start();
}
+ protected static void startStatsUpdater(Configuration conf) throws Exception {
+ StatsUpdateMode mode = StatsUpdateMode.valueOf(
+ MetastoreConf.getVar(conf, ConfVars.STATS_AUTO_UPDATE).toUpperCase());
+ if (mode == StatsUpdateMode.NONE) return;
+ MetaStoreThread t = instantiateThread("org.apache.hadoop.hive.ql.stats.StatsUpdaterThread");
+ initializeAndStartThread(t, conf);
+ }
+
private static void startCompactorInitiator(Configuration conf) throws Exception {
if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
MetaStoreThread initiator =
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index e2ca6d2..346cd92 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
import org.apache.hive.common.util.BloomFilter;
import org.datanucleus.store.rdbms.query.ForwardQueryResult;
import org.slf4j.Logger;
@@ -2459,4 +2460,94 @@ class MetaStoreDirectSql {
return ret;
}
+ public final static Object[] STATS_TABLE_TYPES = new Object[] {
+ TableType.MANAGED_TABLE.toString(), TableType.MATERIALIZED_VIEW.toString()
+ };
+
+ public List<FullTableName> getTableNamesWithStats() throws MetaException {
+ // Could we also join with ACID tables to only get tables with outdated stats?
+ String queryText0 = "SELECT DISTINCT " + TBLS + ".\"TBL_NAME\", " + DBS + ".\"NAME\", "
+ + DBS + ".\"CTLG_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON "
+ + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"";
+ String queryText1 = " WHERE " + TBLS + ".\"TBL_TYPE\" IN ("
+ + makeParams(STATS_TABLE_TYPES.length) + ")";
+
+ List<FullTableName> result = new ArrayList<>();
+
+ String queryText = queryText0 + " INNER JOIN " + TAB_COL_STATS
+ + " ON " + TBLS + ".\"TBL_ID\" = " + TAB_COL_STATS + ".\"TBL_ID\"" + queryText1;
+ getStatsTableListResult(queryText, result);
+
+ queryText = queryText0 + " INNER JOIN " + PARTITIONS + " ON " + TBLS + ".\"TBL_ID\" = "
+ + PARTITIONS + ".\"TBL_ID\"" + " INNER JOIN " + PART_COL_STATS + " ON " + PARTITIONS
+ + ".\"PART_ID\" = " + PART_COL_STATS + ".\"PART_ID\"" + queryText1;
+ getStatsTableListResult(queryText, result);
+
+ return result;
+ }
+
+ public Map<String, List<String>> getColAndPartNamesWithStats(
+ String catName, String dbName, String tableName) throws MetaException {
+ // Could we also join with ACID tables to only get tables with outdated stats?
+ String queryText = "SELECT DISTINCT " + PARTITIONS + ".\"PART_NAME\", " + PART_COL_STATS
+ + ".\"COLUMN_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = "
+ + DBS + ".\"DB_ID\" INNER JOIN " + PARTITIONS + " ON " + TBLS + ".\"TBL_ID\" = "
+ + PARTITIONS + ".\"TBL_ID\" INNER JOIN " + PART_COL_STATS + " ON " + PARTITIONS
+ + ".\"PART_ID\" = " + PART_COL_STATS + ".\"PART_ID\" WHERE " + DBS + ".\"NAME\" = ? AND "
+ + DBS + ".\"CTLG_NAME\" = ? AND " + TBLS + ".\"TBL_NAME\" = ? ORDER BY "
+ + PARTITIONS + ".\"PART_NAME\"";
+
+ LOG.debug("Running {}", queryText);
+ Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ try {
+ List<Object[]> sqlResult = ensureList(executeWithArray(
+ query, new Object[] { dbName, catName, tableName }, queryText));
+ Map<String, List<String>> result = new HashMap<>();
+ String lastPartName = null;
+ List<String> cols = null;
+ for (Object[] line : sqlResult) {
+ String col = extractSqlString(line[1]);
+ String part = extractSqlString(line[0]);
+ if (!part.equals(lastPartName)) {
+ if (lastPartName != null) {
+ result.put(lastPartName, cols);
+ }
+ cols = cols == null ? new ArrayList<>() : new ArrayList<>(cols.size());
+ lastPartName = part;
+ }
+ cols.add(col);
+ }
+ if (lastPartName != null) {
+ result.put(lastPartName, cols);
+ }
+ return result;
+ } finally {
+ query.closeAll();
+ }
+ }
+
+ public List<FullTableName> getAllTableNamesForStats() throws MetaException {
+ String queryText = "SELECT " + TBLS + ".\"TBL_NAME\", " + DBS + ".\"NAME\", "
+ + DBS + ".\"CTLG_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + TBLS
+ + ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+ + " WHERE " + TBLS + ".\"TBL_TYPE\" IN (" + makeParams(STATS_TABLE_TYPES.length) + ")";
+ List<FullTableName> result = new ArrayList<>();
+ getStatsTableListResult(queryText, result);
+ return result;
+ }
+
+ private void getStatsTableListResult(
+ String queryText, List<FullTableName> result) throws MetaException {
+ LOG.debug("Running {}", queryText);
+ Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ try {
+ List<Object[]> sqlResult = ensureList(executeWithArray(query, STATS_TABLE_TYPES, queryText));
+ for (Object[] line : sqlResult) {
+ result.add(new FullTableName(
+ extractSqlString(line[2]), extractSqlString(line[1]), extractSqlString(line[0])));
+ }
+ } finally {
+ query.closeAll();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index e475be8..78b82b4 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
+
import javax.jdo.JDOCanRetryException;
import javax.jdo.JDODataStoreException;
import javax.jdo.JDOException;
@@ -71,6 +72,7 @@ import javax.jdo.identity.IntIdentity;
import javax.sql.DataSource;
import com.google.common.base.Strings;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -211,6 +213,7 @@ import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
import org.apache.hadoop.hive.metastore.utils.ObjectPair;
import org.apache.thrift.TException;
import org.datanucleus.AbstractNucleusContext;
@@ -1546,6 +1549,96 @@ public class ObjectStore implements RawStore, Configurable {
}
}
+ @Override
+ public List<FullTableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
+ return new GetListHelper<FullTableName>(null, null, null, true, false) {
+ @Override
+ protected List<FullTableName> getSqlResult(
+ GetHelper<List<FullTableName>> ctx) throws MetaException {
+ return directSql.getTableNamesWithStats();
+ }
+
+ @Override
+ protected List<FullTableName> getJdoResult(
+ GetHelper<List<FullTableName>> ctx) throws MetaException {
+ throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement?
+ }
+ }.run(false);
+ }
+
+ @Override
+ public Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName, String tableName)
+ throws MetaException, NoSuchObjectException {
+ return new GetHelper<Map<String, List<String>>>(catName, dbName, null, true, false) {
+ @Override
+ protected Map<String, List<String>> getSqlResult(
+ GetHelper<Map<String, List<String>>> ctx) throws MetaException {
+ try {
+ return directSql.getColAndPartNamesWithStats(catName, dbName, tableName);
+ } catch (Throwable ex) {
+ LOG.error("DirectSQL failed", ex);
+ throw new MetaException(ex.getMessage());
+ }
+ }
+
+ @Override
+ protected Map<String, List<String>> getJdoResult(
+ GetHelper<Map<String, List<String>>> ctx) throws MetaException {
+ throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement?
+ }
+
+ @Override
+ protected String describeResult() {
+ return results.size() + " partitions";
+ }
+ }.run(false);
+ }
+
+ @Override
+ public List<FullTableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
+ return new GetListHelper<FullTableName>(null, null, null, true, false) {
+ @Override
+ protected List<FullTableName> getSqlResult(
+ GetHelper<List<FullTableName>> ctx) throws MetaException {
+ return directSql.getAllTableNamesForStats();
+ }
+
+ @Override
+ protected List<FullTableName> getJdoResult(
+ GetHelper<List<FullTableName>> ctx) throws MetaException {
+ boolean commited = false;
+ Query query = null;
+ List<FullTableName> result = new ArrayList<>();
+ openTransaction();
+ try {
+ String paramStr = "", whereStr = "";
+ for (int i = 0; i < MetaStoreDirectSql.STATS_TABLE_TYPES.length; ++i) {
+ if (i != 0) {
+ paramStr += ", ";
+ whereStr += "||";
+ }
+ paramStr += "java.lang.String tt" + i;
+ whereStr += " tableType == tt" + i;
+ }
+ query = pm.newQuery(MTable.class, whereStr);
+ query.declareParameters(paramStr);
+ @SuppressWarnings("unchecked")
+ Collection<MTable> tbls = (Collection<MTable>) query.executeWithArray(
+ query, MetaStoreDirectSql.STATS_TABLE_TYPES);
+ pm.retrieveAll(tbls);
+ for (MTable tbl : tbls) {
+ result.add(new FullTableName(
+ tbl.getDatabase().getCatalogName(), tbl.getDatabase().getName(), tbl.getTableName()));
+ }
+ commited = commitTransaction();
+ } finally {
+ rollbackAndCleanup(commited, query);
+ }
+ return result;
+ }
+ }.run(false);
+ }
+
protected List<String> getTablesInternal(String catName, String dbName, String pattern,
TableType tableType, boolean allowSql, boolean allowJdo)
throws MetaException, NoSuchObjectException {
@@ -3447,9 +3540,9 @@ public class ObjectStore implements RawStore, Configurable {
boolean allowSql, boolean allowJdo) throws MetaException {
assert allowSql || allowJdo;
this.allowJdo = allowJdo;
- this.catName = normalizeIdentifier(catalogName);
- this.dbName = normalizeIdentifier(dbName);
- if (tblName != null){
+ this.catName = (catalogName != null) ? normalizeIdentifier(catalogName) : null;
+ this.dbName = (dbName != null) ? normalizeIdentifier(dbName) : null;
+ if (tblName != null) {
this.tblName = normalizeIdentifier(tblName);
} else {
// tblName can be null in cases of Helper being used at a higher
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index 283798c..f350aa9 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
import org.apache.thrift.TException;
public interface RawStore extends Configurable {
@@ -1639,4 +1640,11 @@ public interface RawStore extends Configurable {
/** Removes outdated statistics. */
int deleteRuntimeStats(int maxRetainSecs) throws MetaException;
+ List<FullTableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException;
+
+ List<FullTableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException;
+
+ Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName,
+ String tableName) throws MetaException, NoSuchObjectException;
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 9da8d72..d9356b8 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.FullTableName;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -2492,4 +2493,20 @@ public class CachedStore implements RawStore, Configurable {
public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
return rawStore.deleteRuntimeStats(maxRetainSecs);
}
+
+ @Override
+ public List<FullTableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
+ return rawStore.getTableNamesWithStats();
+ }
+
+ @Override
+ public List<FullTableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
+ return rawStore.getAllTableNamesForStats();
+ }
+
+ @Override
+ public Map<String, List<String>> getPartitionColsWithStats(String catName,
+ String dbName, String tableName) throws MetaException, NoSuchObjectException {
+ return rawStore.getPartitionColsWithStats(catName, dbName, tableName);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java
new file mode 100644
index 0000000..8695471
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/EnumValidator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.conf;
+
+import java.util.Arrays;
+
+public class EnumValidator extends StringSetValidator {
+ public EnumValidator(Object[] statsUpdateModes) {
+ super(false, Arrays.stream(statsUpdateModes).map(eo -> eo.toString()).toArray(String[]::new));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index f3ed8cd..ed0edec 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -83,6 +83,10 @@ public class MetastoreConf {
@VisibleForTesting
static final String TEST_ENV_WORKAROUND = "metastore.testing.env.workaround.dont.ever.set.this.";
+ public static enum StatsUpdateMode {
+ NONE, EXISTING, ALL
+ }
+
private static class TimeValue {
final long val;
final TimeUnit unit;
@@ -727,6 +731,18 @@ public class MetastoreConf {
"The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is custom type."),
STATS_DEFAULT_PUBLISHER("metastore.stats.default.publisher", "hive.stats.default.publisher", "",
"The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is custom type."),
+ STATS_AUTO_UPDATE("metastore.stats.auto.analyze", "hive.metastore.stats.auto.analyze", "none",
+ new EnumValidator(StatsUpdateMode.values()),
+ "Whether to update stats in the background; none - no, all - for all tables, existing - only existing, out of date, stats."),
+ STATS_AUTO_UPDATE_NOOP_WAIT("metastore.stats.auto.analyze.noop.wait",
+ "hive.metastore.stats.auto.analyze.noop.wait", 5L, TimeUnit.MINUTES,
+ new TimeValidator(TimeUnit.MINUTES),
+ "How long to sleep if there were no stats needing update during an update iteration.\n" +
+ "This is a setting to throttle table/partition checks when nothing is being changed; not\n" +
+ "the analyze queries themselves."),
+ STATS_AUTO_UPDATE_WORKER_COUNT("metastore.stats.auto.analyze.worker.count",
+ "hive.metastore.stats.auto.analyze.worker.count", 1,
+ "Number of parallel analyze commands to run for background stats update."),
STORAGE_SCHEMA_READER_IMPL("metastore.storage.schema.reader.impl", "metastore.storage.schema.reader.impl",
DefaultStorageSchemaReader.class.getName(),
"The class to use to read schemas from storage. It must implement " +
http://git-wip-us.apache.org/repos/asf/hive/blob/bdac6edf/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 9b36d09..6ade490 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -24,6 +24,7 @@ import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.collections.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -64,7 +65,9 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.MachineList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -718,12 +721,12 @@ public class MetaStoreUtils {
params.put(StatsSetupConst.NUM_FILES, Integer.toString(numFiles));
params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(tableSize));
}
-
+
public static void clearQuickStats(Map<String, String> params) {
params.remove(StatsSetupConst.NUM_FILES);
params.remove(StatsSetupConst.TOTAL_SIZE);
}
-
+
public static boolean areSameColumns(List<FieldSchema> oldCols, List<FieldSchema> newCols) {
return ListUtils.isEqualList(oldCols, newCols);
@@ -1800,4 +1803,35 @@ public class MetaStoreUtils {
if (catName == null || "".equals(catName)) catName = Warehouse.DEFAULT_CATALOG_NAME;
return catName;
}
+
+
+ public static class FullTableName {
+ public final String catalog, db, table;
+
+ public FullTableName(String catalog, String db, String table) {
+ assert catalog != null && db != null && table != null : catalog + ", " + db + ", " + table;
+ this.catalog = catalog;
+ this.db = db;
+ this.table = table;
+ }
+
+ @Override
+ public String toString() {
+ return catalog + MetaStoreUtils.CATALOG_DB_SEPARATOR + db + "." + table;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ return prime * (prime * (prime + catalog.hashCode()) + db.hashCode()) + table.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ FullTableName other = (FullTableName) obj;
+ return catalog.equals(other.catalog) && db.equals(other.db) && table.equals(other.table);
+ }
+ }
}