You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/02/21 16:59:11 UTC
hive git commit: HIVE-18259: Automatic cleanup of invalidation cache
for materialized views (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 102731f6e -> 0b03b819f
HIVE-18259: Automatic cleanup of invalidation cache for materialized views (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0b03b819
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0b03b819
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0b03b819
Branch: refs/heads/master
Commit: 0b03b819f953905d28778c66bf10ea4f1fe47976
Parents: 102731f
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Mon Jan 22 18:42:48 2018 -0800
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Feb 21 08:58:59 2018 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 13 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 5 +-
.../MaterializationsCacheCleanerTask.java | 63 ++++
.../MaterializationsInvalidationCache.java | 146 +++++++-
.../hive/metastore/conf/MetastoreConf.java | 19 +-
...stMetaStoreMaterializationsCacheCleaner.java | 331 +++++++++++++++++++
6 files changed, 566 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 38f6430..169ddcb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1161,7 +1161,7 @@ public class HiveConf extends Configuration {
// materialized views
HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING("hive.materializedview.rewriting", false,
"Whether to try to rewrite queries using the materialized views enabled for rewriting"),
- HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW("hive.materializedview.rewriting.time.window", 0,
+ HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW("hive.materializedview.rewriting.time.window", "0s", new TimeValidator(TimeUnit.SECONDS),
"Time window, specified in seconds, after which outdated materialized views become invalid for automatic query rewriting.\n" +
"For instance, if a materialized view is created and afterwards one of its source tables is changed at " +
"moment in time t0, the materialized view will not be considered for rewriting anymore after t0 plus " +
@@ -1172,6 +1172,17 @@ public class HiveConf extends Configuration {
"Default file format for CREATE MATERIALIZED VIEW statement"),
HIVE_MATERIALIZED_VIEW_SERDE("hive.materializedview.serde",
"org.apache.hadoop.hive.ql.io.orc.OrcSerde", "Default SerDe used for materialized views"),
+ HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_IMPL("hive.metastore.materializations.invalidation.impl", "DEFAULT",
+ new StringSet("DEFAULT", "DISABLE"),
+ "The implementation that we should use for the materializations invalidation cache. \n" +
+ " DEFAULT: Default implementation for invalidation cache\n" +
+ " DISABLE: Disable invalidation cache (debugging purposes)"),
+ HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY("hive.metastore.materializations.invalidation.clean.frequency",
+ "3600s", new TimeValidator(TimeUnit.SECONDS), "Frequency at which timer task runs to remove unnecessary transactions information from" +
+ "materializations invalidation cache."),
+ HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION("hive.metastore.materializations.invalidation.max.duration",
+ "86400s", new TimeValidator(TimeUnit.SECONDS), "Maximum duration for query producing a materialization. After this time, transactions" +
+ "information that is not relevant for materializations can be removed from invalidation cache."),
// hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row,
// need to remove by hive .13. Also, do not change default (see SMB operator)
http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 7b7e140..9c3b54f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.stream.Collectors;
@@ -1293,7 +1294,9 @@ public class Hive {
* @throws HiveException
*/
public List<RelOptMaterialization> getValidMaterializedViews() throws HiveException {
- final long diff = conf.getIntVar(HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) * 1000;
+ final long diff =
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW,
+ TimeUnit.MILLISECONDS);
final long minTime = System.currentTimeMillis() - diff;
try {
// Final result
http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
new file mode 100644
index 0000000..cc168a9
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Task responsible for cleaning the transactions that are not useful from the
+ * materializations cache.
+ */
+public class MaterializationsCacheCleanerTask implements MetastoreTaskThread {
+ private static final Logger LOG = LoggerFactory.getLogger(MaterializationsCacheCleanerTask.class);
+
+ private Configuration conf;
+
+ @Override
+ public long runFrequency(TimeUnit unit) {
+ return MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY, unit);
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ conf = configuration;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void run() {
+ long removedCnt = MaterializationsInvalidationCache.get().cleanup(System.currentTimeMillis() -
+ MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION, TimeUnit.MILLISECONDS));
+ if (removedCnt > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of transaction entries deleted from materializations cache: " + removedCnt);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
index 92653ae..6e54eb9 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hive.metastore;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -28,10 +31,13 @@ import java.util.concurrent.Executors;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
import org.apache.hadoop.hive.metastore.api.Materialization;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +60,9 @@ public final class MaterializationsInvalidationCache {
/* Singleton */
private static final MaterializationsInvalidationCache SINGLETON = new MaterializationsInvalidationCache();
+ /* If this boolean is true, this class has no functionality. Only for debugging purposes. */
+ private boolean disable;
+
/* Key is the database name. Each value is a map from the unique view qualified name to
* the materialization invalidation info. This invalidation object contains information
* such as the tables used by the materialized view or the invalidation time, i.e., first
@@ -62,10 +71,10 @@ public final class MaterializationsInvalidationCache {
new ConcurrentHashMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>>();
/*
- * Key is a qualified table name. The value is a (sorted) tree set (supporting concurrent
- * modifications) that will keep the modifications for a given table in the order that they
- * happen. This is useful to quickly check the invalidation time for a given materialized
- * view.
+ * Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent
+ * modifications) that will keep the modifications for a given table in the order of their
+ * transaction id. This is useful to quickly check the invalidation time for a given
+ * materialization.
*/
private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications =
new ConcurrentHashMap<String, ConcurrentSkipListMap<Long, Long>>();
@@ -100,6 +109,14 @@ public final class MaterializationsInvalidationCache {
this.store = store;
this.txnStore = txnStore;
+ // This will only be true for debugging purposes
+ this.disable = MetastoreConf.getVar(store.getConf(),
+ MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_IMPL).equals("DISABLE");
+ if (disable) {
+ // Nothing to do
+ return;
+ }
+
if (!initialized) {
this.initialized = true;
ExecutorService pool = Executors.newCachedThreadPool();
@@ -162,6 +179,10 @@ public final class MaterializationsInvalidationCache {
*/
private void addMaterializedView(String dbName, String tableName, Set<String> tablesUsed,
String validTxnList, OpType opType) {
+ if (disable) {
+ // Nothing to do
+ return;
+ }
// We are going to create the map for each view in the given database
ConcurrentMap<String, MaterializationInvalidationInfo> cq =
new ConcurrentHashMap<String, MaterializationInvalidationInfo>();
@@ -225,6 +246,10 @@ public final class MaterializationsInvalidationCache {
*/
public void notifyTableModification(String dbName, String tableName,
long txnId, long newModificationTime) {
+ if (disable) {
+ // Nothing to do
+ return;
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}",
tableName, dbName, txnId, newModificationTime);
@@ -246,6 +271,10 @@ public final class MaterializationsInvalidationCache {
* @param tableName
*/
public void dropMaterializedView(String dbName, String tableName) {
+ if (disable) {
+ // Nothing to do
+ return;
+ }
materializations.get(dbName).remove(tableName);
}
@@ -308,12 +337,12 @@ public final class MaterializationsInvalidationCache {
ValidTxnList txnList = new ValidReadTxnList(txnListString);
long firstModificationTimeAfterCreation = 0L;
for (String qNameTableUsed : materialization.getTablesUsed()) {
- final Long tn = tableModifications.get(qNameTableUsed)
- .higherKey(txnList.getHighWatermark());
+ final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
+ .higherEntry(txnList.getHighWatermark());
if (tn != null) {
if (firstModificationTimeAfterCreation == 0L ||
- tn < firstModificationTimeAfterCreation) {
- firstModificationTimeAfterCreation = tn;
+ tn.getValue() < firstModificationTimeAfterCreation) {
+ firstModificationTimeAfterCreation = tn.getValue();
}
}
// Min open txn might be null if there were no open transactions
@@ -346,4 +375,105 @@ public final class MaterializationsInvalidationCache {
ALTER
}
+ /**
+ * Removes transaction events that are not relevant anymore.
+ * @param minTime events generated before this time (ms) can be deleted from the cache
+ * @return number of events that were deleted from the cache
+ */
+ public long cleanup(long minTime) {
+ // To remove, mv should meet two conditions:
+ // 1) Current time - time of transaction > config parameter, and
+ // 2) Transaction should not be associated with invalidation of a MV
+ if (disable || !initialized) {
+ // Bail out
+ return 0L;
+ }
+ // We execute the cleanup in two steps
+ // First we gather all the transactions that need to be kept
+ final Multimap<String, Long> keepTxnInfos = HashMultimap.create();
+ for (Map.Entry<String, ConcurrentMap<String, MaterializationInvalidationInfo>> e : materializations.entrySet()) {
+ for (MaterializationInvalidationInfo m : e.getValue().values()) {
+ ValidTxnList txnList = new ValidReadTxnList(m.getValidTxnList());
+ boolean canBeDeleted = false;
+ String currentTableForInvalidatingTxn = null;
+ long currentInvalidatingTxnId = 0L;
+ long currentInvalidatingTxnTime = 0L;
+ for (String qNameTableUsed : m.getTablesUsed()) {
+ final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
+ .higherEntry(txnList.getHighWatermark());
+ if (tn != null) {
+ if (currentInvalidatingTxnTime == 0L ||
+ tn.getValue() < currentInvalidatingTxnTime) {
+ // This transaction 1) is the first one examined for this materialization, or
+ // 2) it is the invalidating transaction. Hence we add it to the transactions to keep.
+ // 1.- We remove the previous invalidating transaction from the transactions
+ // to be kept (if needed).
+ if (canBeDeleted && currentInvalidatingTxnTime < minTime) {
+ keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId);
+ }
+ // 2.- We add this transaction to the transactions that should be kept.
+ canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(tn.getKey());
+ keepTxnInfos.put(qNameTableUsed, tn.getKey());
+ // 3.- We record this transaction as the current invalidating transaction.
+ currentTableForInvalidatingTxn = qNameTableUsed;
+ currentInvalidatingTxnId = tn.getKey();
+ currentInvalidatingTxnTime = tn.getValue();
+ }
+ }
+ if (txnList.getMinOpenTxn() != null) {
+ // Invalid transaction list is sorted
+ int pos = 0;
+ for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed)
+ .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) {
+ while (pos < txnList.getInvalidTransactions().length &&
+ txnList.getInvalidTransactions()[pos] != t.getKey()) {
+ pos++;
+ }
+ if (pos >= txnList.getInvalidTransactions().length) {
+ break;
+ }
+ if (currentInvalidatingTxnTime == 0L ||
+ t.getValue() < currentInvalidatingTxnTime) {
+ // This transaction 1) is the first one examined for this materialization, or
+ // 2) it is the invalidating transaction. Hence we add it to the transactions to keep.
+ // 1.- We remove the previous invalidating transaction from the transactions
+ // to be kept (if needed).
+ if (canBeDeleted && currentInvalidatingTxnTime < minTime) {
+ keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId);
+ }
+ // 2.- We add this transaction to the transactions that should be kept.
+ canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(t.getKey());
+ keepTxnInfos.put(qNameTableUsed, t.getKey());
+ // 3.- We record this transaction as the current invalidating transaction.
+ currentTableForInvalidatingTxn = qNameTableUsed;
+ currentInvalidatingTxnId = t.getKey();
+ currentInvalidatingTxnTime = t.getValue();
+ }
+ }
+ }
+ }
+ }
+ }
+ // Second, we remove the transactions
+ long removed = 0L;
+ for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : tableModifications.entrySet()) {
+ Collection<Long> c = keepTxnInfos.get(e.getKey());
+ for (Iterator<Entry<Long, Long>> it = e.getValue().entrySet().iterator(); it.hasNext();) {
+ Entry<Long, Long> v = it.next();
+ // We need to check again the time because some of the transactions might not be explored
+ // above, e.g., transactions above the highest transaction mark for all the materialized
+ // views.
+ if (v.getValue() < minTime && (c.isEmpty() || !c.contains(v.getKey()))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}",
+ e.getKey(), v.getKey(), v.getValue());
+ }
+ it.remove();
+ removed++;
+ }
+ }
+ }
+ return removed;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 699a649..9f82256 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader;
import org.apache.hadoop.hive.metastore.HiveAlterHandler;
+import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
@@ -553,6 +554,20 @@ public class MetastoreConf {
"javax.jdo.PersistenceManagerFactoryClass",
"org.datanucleus.api.jdo.JDOPersistenceManagerFactory",
"class implementing the jdo persistence"),
+ MATERIALIZATIONS_INVALIDATION_CACHE_IMPL("metastore.materializations.invalidation.impl",
+ "hive.metastore.materializations.invalidation.impl", "DEFAULT",
+ new Validator.StringSet("DEFAULT", "DISABLE"),
+ "The implementation that we should use for the materializations invalidation cache. \n" +
+ " DEFAULT: Default implementation for invalidation cache\n" +
+ " DISABLE: Disable invalidation cache (debugging purposes)"),
+ MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY("metastore.materializations.invalidation.clean.frequency",
+ "hive.metastore.materializations.invalidation.clean.frequency",
+ 3600, TimeUnit.SECONDS, "Frequency at which timer task runs to remove unnecessary transaction entries from" +
+ "materializations invalidation cache."),
+ MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION("metastore.materializations.invalidation.max.duration",
+ "hive.metastore.materializations.invalidation.max.duration",
+ 86400, TimeUnit.SECONDS, "Maximum duration for query producing a materialization. After this time, transaction" +
+ "entries that are not relevant for materializations can be removed from invalidation cache."),
// Parameters for exporting metadata on table drop (requires the use of the)
// org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener
METADATA_EXPORT_LOCATION("metastore.metadata.export.location", "hive.metadata.export.location",
@@ -708,7 +723,9 @@ public class MetastoreConf {
+ "The only supported special character right now is '/'. This flag applies only to quoted table names.\n"
+ "The default value is true."),
TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always",
- EventCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask",
+ EventCleanerTask.class.getName() + "," +
+ "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
+ MaterializationsCacheCleanerTask.class.getName(),
"Comma separated list of tasks that will be started in separate threads. These will " +
"always be started, regardless of whether the metastore is running in embedded mode " +
"or in server mode. They must implement " + MetastoreTaskThread.class.getName()),
http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java
new file mode 100644
index 0000000..8f8bff6
--- /dev/null
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.junit.Assert;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache}.
+ * The tests focus on arrival of notifications (possibly out of order) and the logic
+ * to clean up the materializations cache. Tests need to be executed in a certain order
+ * to avoid interactions among them, as the invalidation cache is a singleton.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestMetaStoreMaterializationsCacheCleaner {
+
+ private static final String DB_NAME = "hive3252";
+ private static final String TBL_NAME_1 = "tmptbl1";
+ private static final String TBL_NAME_2 = "tmptbl2";
+ private static final String TBL_NAME_3 = "tmptbl3";
+ private static final String MV_NAME_1 = "mv1";
+ private static final String MV_NAME_2 = "mv2";
+
+
+ @Test
+ public void testCleanerScenario1() throws Exception {
+ // create mock raw store
+ final RawStore rawStore = mock(RawStore.class);
+ when(rawStore.getAllDatabases()).thenReturn(ImmutableList.of());
+ Configuration conf = new Configuration();
+ conf.set("metastore.materializations.invalidation.impl", "DISABLE");
+ when(rawStore.getConf()).thenReturn(conf);
+ // create mock txn store
+ final TxnStore txnStore = mock(TxnStore.class);
+ // initialize invalidation cache (set conf to disable)
+ MaterializationsInvalidationCache.get().init(rawStore, txnStore);
+
+ // This is a dummy test, invalidation cache is not supposed to
+ // record any information.
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, 1, 1);
+ int id = 2;
+ BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, id, id);
+ // Create tbl2 (nothing to do)
+ id = 3;
+ BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, id, id);
+ // Cleanup (current = 4, duration = 4) -> Does nothing
+ long removed = MaterializationsInvalidationCache.get().cleanup(0L);
+ Assert.assertEquals(0L, removed);
+ // Create mv1
+ Table mv1 = mock(Table.class);
+ when(mv1.getDbName()).thenReturn(DB_NAME);
+ when(mv1.getTableName()).thenReturn(MV_NAME_1);
+ CreationMetadata mockCM1 = new CreationMetadata(
+ DB_NAME, MV_NAME_1,
+ ImmutableSet.of(
+ DB_NAME + "." + TBL_NAME_1,
+ DB_NAME + "." + TBL_NAME_2));
+ // Create txn list (highWatermark=4;minOpenTxn=Long.MAX_VALUE)
+ mockCM1.setValidTxnList("3:" + Long.MAX_VALUE + "::");
+ when(mv1.getCreationMetadata()).thenReturn(mockCM1);
+ MaterializationsInvalidationCache.get().createMaterializedView(mockCM1.getDbName(), mockCM1.getTblName(),
+ mockCM1.getTablesUsed(), mockCM1.getValidTxnList());
+ Map<String, Materialization> invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1));
+ Assert.assertTrue(invalidationInfos.isEmpty());
+ id = 10;
+ BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, id, id);
+ id = 9;
+ BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, id, id);
+ // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3
+ removed = MaterializationsInvalidationCache.get().cleanup(8L);
+ Assert.assertEquals(0L, removed);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1));
+ Assert.assertTrue(invalidationInfos.isEmpty());
+ // Create mv2
+ Table mv2 = mock(Table.class);
+ when(mv2.getDbName()).thenReturn(DB_NAME);
+ when(mv2.getTableName()).thenReturn(MV_NAME_2);
+ CreationMetadata mockCM2 = new CreationMetadata(
+ DB_NAME, MV_NAME_2,
+ ImmutableSet.of(
+ DB_NAME + "." + TBL_NAME_1,
+ DB_NAME + "." + TBL_NAME_2));
+ // Create txn list (highWatermark=10;minOpenTxn=Long.MAX_VALUE)
+ mockCM2.setValidTxnList("10:" + Long.MAX_VALUE + "::");
+ when(mv2.getCreationMetadata()).thenReturn(mockCM2);
+ MaterializationsInvalidationCache.get().createMaterializedView(mockCM2.getDbName(), mockCM2.getTblName(),
+ mockCM2.getTablesUsed(), mockCM2.getValidTxnList());
+ when(mv2.getCreationMetadata()).thenReturn(mockCM2);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+ Assert.assertTrue(invalidationInfos.isEmpty());
+ // Create tbl3 (nothing to do)
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_3, 11, 11);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_3, 18, 18);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, 14, 14);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, 17, 17);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, 16, 16);
+ // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11
+ removed = MaterializationsInvalidationCache.get().cleanup(16L);
+ Assert.assertEquals(0L, removed);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+ Assert.assertTrue(invalidationInfos.isEmpty());
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, 12, 12);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, 15, 15);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, 7, 7);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+ Assert.assertTrue(invalidationInfos.isEmpty());
+ // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18
+ removed = MaterializationsInvalidationCache.get().cleanup(20L);
+ Assert.assertEquals(0L, removed);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+ Assert.assertTrue(invalidationInfos.isEmpty());
+ // Cleanup (current = 28, duration = 4) -> Removes txn9
+ removed = MaterializationsInvalidationCache.get().cleanup(24L);
+ Assert.assertEquals(0L, removed);
+ }
+
+ @Test
+ public void testCleanerScenario2() throws Exception {
+ // create mock raw store
+ final RawStore rawStore = mock(RawStore.class);
+ when(rawStore.getAllDatabases()).thenReturn(ImmutableList.of());
+ Configuration conf = new Configuration();
+ conf.set("metastore.materializations.invalidation.impl", "DEFAULT");
+ when(rawStore.getConf()).thenReturn(conf);
+ // create mock txn store
+ final TxnStore txnStore = mock(TxnStore.class);
+ // initialize invalidation cache (set conf to default)
+ MaterializationsInvalidationCache.get().init(rawStore, txnStore);
+
+ // Scenario consists of the following steps:
+ // Create tbl1
+ // (t = 1) Insert row in tbl1
+ // (t = 2) Insert row in tbl1
+ // Create tbl2
+ // (t = 3) Insert row in tbl2
+ // Cleanup (current = 4, duration = 4) -> Does nothing
+ // Create mv1
+ // (t = 10) Insert row in tbl2
+ // (t = 9) Insert row in tbl1 (out of order)
+ // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3
+ // Create mv2
+ // Create tbl3
+ // (t = 11) Insert row in tbl3
+ // (t = 18) Insert row in tbl3
+ // (t = 14) Insert row in tbl1
+ // (t = 17) Insert row in tbl1
+ // (t = 16) Insert row in tbl2
+ // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11
+ // (t = 12) Insert row in tbl1
+ // (t = 15) Insert row in tbl2
+ // (t = 7) Insert row in tbl2
+ // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18
+ // Create tbl1 (nothing to do)
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, 1, 1);
+ int id = 2;
+ BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, id, id);
+ // Create tbl2 (nothing to do)
+ id = 3;
+ BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, id, id);
+ // Cleanup (current = 4, duration = 4) -> Does nothing
+ long removed = MaterializationsInvalidationCache.get().cleanup(0L);
+ Assert.assertEquals(0L, removed);
+ // Create mv1
+ Table mv1 = mock(Table.class);
+ when(mv1.getDbName()).thenReturn(DB_NAME);
+ when(mv1.getTableName()).thenReturn(MV_NAME_1);
+ CreationMetadata mockCM1 = new CreationMetadata(
+ DB_NAME, MV_NAME_1,
+ ImmutableSet.of(
+ DB_NAME + "." + TBL_NAME_1,
+ DB_NAME + "." + TBL_NAME_2));
+ // Create txn list (highWatermark=4;minOpenTxn=Long.MAX_VALUE)
+ mockCM1.setValidTxnList("3:" + Long.MAX_VALUE + "::");
+ when(mv1.getCreationMetadata()).thenReturn(mockCM1);
+ MaterializationsInvalidationCache.get().createMaterializedView(mockCM1.getDbName(), mockCM1.getTblName(),
+ mockCM1.getTablesUsed(), mockCM1.getValidTxnList());
+ Map<String, Materialization> invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1));
+ Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+ id = 10;
+ BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, id, id);
+ id = 9;
+ BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, id, id);
+ // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3
+ removed = MaterializationsInvalidationCache.get().cleanup(8L);
+ Assert.assertEquals(3L, removed);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1));
+ Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+ // Create mv2
+ Table mv2 = mock(Table.class);
+ when(mv2.getDbName()).thenReturn(DB_NAME);
+ when(mv2.getTableName()).thenReturn(MV_NAME_2);
+ CreationMetadata mockCM2 = new CreationMetadata(
+ DB_NAME, MV_NAME_2,
+ ImmutableSet.of(
+ DB_NAME + "." + TBL_NAME_1,
+ DB_NAME + "." + TBL_NAME_2));
+ // Create txn list (highWatermark=10;minOpenTxn=Long.MAX_VALUE)
+ mockCM2.setValidTxnList("10:" + Long.MAX_VALUE + "::");
+ when(mv2.getCreationMetadata()).thenReturn(mockCM2);
+ MaterializationsInvalidationCache.get().createMaterializedView(mockCM2.getDbName(), mockCM2.getTblName(),
+ mockCM2.getTablesUsed(), mockCM2.getValidTxnList());
+ when(mv2.getCreationMetadata()).thenReturn(mockCM2);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+ Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+ Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_2).getInvalidationTime());
+ // Create tbl3 (nothing to do)
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_3, 11, 11);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_3, 18, 18);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, 14, 14);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, 17, 17);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, 16, 16);
+ // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11
+ removed = MaterializationsInvalidationCache.get().cleanup(16L);
+ Assert.assertEquals(2L, removed);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+ Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+ Assert.assertEquals(14L, invalidationInfos.get(MV_NAME_2).getInvalidationTime());
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_1, 12, 12);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, 15, 15);
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ DB_NAME, TBL_NAME_2, 7, 7);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+ Assert.assertEquals(7L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+ Assert.assertEquals(12L, invalidationInfos.get(MV_NAME_2).getInvalidationTime());
+ // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18
+ removed = MaterializationsInvalidationCache.get().cleanup(20L);
+ Assert.assertEquals(6L, removed);
+ invalidationInfos =
+ MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+ DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+ Assert.assertEquals(7L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+ Assert.assertEquals(12L, invalidationInfos.get(MV_NAME_2).getInvalidationTime());
+ // Cleanup (current = 28, duration = 4) -> Removes txn9
+ removed = MaterializationsInvalidationCache.get().cleanup(24L);
+ Assert.assertEquals(0L, removed);
+ }
+
+ private static BasicTxnInfo createTxnInfo(String dbName, String tableName, int i) {
+ BasicTxnInfo r = new BasicTxnInfo();
+ r.setDbname(dbName);
+ r.setTablename(tableName);
+ r.setTxnid(i);
+ r.setTime(i);
+ return r;
+ }
+}