You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2017/09/11 18:39:07 UTC
incubator-tephra git commit: TEPHRA-244 Remove regions of deleted
tables when computing prune upper bound
Repository: incubator-tephra
Updated Branches:
refs/heads/master 827a70c68 -> b3370b662
TEPHRA-244 Remove regions of deleted tables when computing prune upper bound
This closes #55
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/b3370b66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/b3370b66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/b3370b66
Branch: refs/heads/master
Commit: b3370b662c2df0fbe8581944754fd2376ad70dc8
Parents: 827a70c
Author: poorna <po...@cask.co>
Authored: Sun Sep 10 21:38:08 2017 -0700
Committer: poorna <po...@apache.org>
Committed: Mon Sep 11 11:38:21 2017 -0700
----------------------------------------------------------------------
.../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++
.../src/test/resources/logback-test.xml | 39 ++++++++++++
.../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++
.../src/test/resources/logback-test.xml | 39 ++++++++++++
.../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++
.../src/test/resources/logback-test.xml | 39 ++++++++++++
.../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++
.../src/test/resources/logback-test.xml | 39 ++++++++++++
.../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++
.../src/test/resources/logback-test.xml | 39 ++++++++++++
.../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++
.../src/test/resources/logback-test.xml | 39 ++++++++++++
18 files changed, 834 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 021f1b2..44b4bac 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -20,6 +20,7 @@
package org.apache.tephra.hbase.txprune;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -293,6 +296,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
* @throws IOException when not able to talk to HBase
*/
private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ // Get the tables for the current time from the latest regions set
+ final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
+ LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
+
do {
LOG.debug("Computing prune upper bound for {}", timeRegions);
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
@@ -309,6 +316,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
continue;
}
+ // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
+ // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
+ // compacted. This ensures that transient tables do not block pruning progress.
+ transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
+ Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+
// Get the prune upper bounds for all the transactional regions
Map<byte[], Long> pruneUpperBoundRegions =
dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
@@ -342,6 +358,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
return -1;
}
+ private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
+ SortedSet<byte[]> transactionalRegions) {
+ return Sets.filter(transactionalRegions,
+ new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] region) {
+ return existingTables.contains(HRegionInfo.getTable(region));
+ }
+ });
+ }
+
+ private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
+ Set<TableName> tableNames = new HashSet<>(regions.size());
+ for (byte[] region : regions) {
+ tableNames.add(HRegionInfo.getTable(region));
+ }
+ return tableNames;
+ }
+
private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
SortedSet<byte[]> transactionalRegions,
Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 91bbc1a..cea9b5c 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -391,6 +391,71 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
}
+ @Test
+ public void testPruneTransientTable() throws Exception {
+ // Make sure that transient tables do not block the progress of pruning
+
+ // Create a temp table
+ TableName txTempTable = TableName.valueOf("tempTable");
+ createTable(txTempTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TableName txDataTable2 = null;
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+
+ // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet.
+ // This run is only to store the initial set of regions
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now delete the transient table
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ txTempTable = null;
+
+ // Compact the data table now
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Create a new table that will not be compacted
+ txDataTable2 = TableName.valueOf("invalidListPruneTestTable2");
+ createTable(txDataTable2.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore,
+ // and txDataTable2 has not been compacted/flushed yet
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+ } finally {
+ transactionPruningPlugin.destroy();
+ if (txDataTable2 != null) {
+ hBaseAdmin.disableTable(txDataTable2);
+ hBaseAdmin.deleteTable(txDataTable2);
+ }
+ if (txTempTable != null) {
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ }
+ }
+ }
+
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation = connection.getRegionLocation(dataTable, row, true);
return regionLocation.getRegionInfo().getRegionName();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.96/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/resources/logback-test.xml b/tephra-hbase-compat-0.96/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..36f0a37
--- /dev/null
+++ b/tephra-hbase-compat-0.96/src/test/resources/logback-test.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager -->
+ <logger name="BlockStateChange" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+ <logger name="org.mortbay.log" level="WARN" />
+ <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 021f1b2..44b4bac 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -20,6 +20,7 @@
package org.apache.tephra.hbase.txprune;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -293,6 +296,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
* @throws IOException when not able to talk to HBase
*/
private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ // Get the tables for the current time from the latest regions set
+ final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
+ LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
+
do {
LOG.debug("Computing prune upper bound for {}", timeRegions);
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
@@ -309,6 +316,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
continue;
}
+ // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
+ // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
+ // compacted. This ensures that transient tables do not block pruning progress.
+ transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
+ Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+
// Get the prune upper bounds for all the transactional regions
Map<byte[], Long> pruneUpperBoundRegions =
dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
@@ -342,6 +358,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
return -1;
}
+ private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
+ SortedSet<byte[]> transactionalRegions) {
+ return Sets.filter(transactionalRegions,
+ new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] region) {
+ return existingTables.contains(HRegionInfo.getTable(region));
+ }
+ });
+ }
+
+ private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
+ Set<TableName> tableNames = new HashSet<>(regions.size());
+ for (byte[] region : regions) {
+ tableNames.add(HRegionInfo.getTable(region));
+ }
+ return tableNames;
+ }
+
private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
SortedSet<byte[]> transactionalRegions,
Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 91bbc1a..cea9b5c 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -391,6 +391,71 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
}
+ @Test
+ public void testPruneTransientTable() throws Exception {
+ // Make sure that transient tables do not block the progress of pruning
+
+ // Create a temp table
+ TableName txTempTable = TableName.valueOf("tempTable");
+ createTable(txTempTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TableName txDataTable2 = null;
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+
+ // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet.
+ // This run is only to store the initial set of regions
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now delete the transient table
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ txTempTable = null;
+
+ // Compact the data table now
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Create a new table that will not be compacted
+ txDataTable2 = TableName.valueOf("invalidListPruneTestTable2");
+ createTable(txDataTable2.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore,
+ // and txDataTable2 has not been compacted/flushed yet
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+ } finally {
+ transactionPruningPlugin.destroy();
+ if (txDataTable2 != null) {
+ hBaseAdmin.disableTable(txDataTable2);
+ hBaseAdmin.deleteTable(txDataTable2);
+ }
+ if (txTempTable != null) {
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ }
+ }
+ }
+
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation = connection.getRegionLocation(dataTable, row, true);
return regionLocation.getRegionInfo().getRegionName();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.98/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/resources/logback-test.xml b/tephra-hbase-compat-0.98/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..36f0a37
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/test/resources/logback-test.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager -->
+ <logger name="BlockStateChange" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+ <logger name="org.mortbay.log" level="WARN" />
+ <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 84c480a..42c9f84 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -20,6 +20,7 @@
package org.apache.tephra.hbase.txprune;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -286,6 +289,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
* @throws IOException when not able to talk to HBase
*/
private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ // Get the tables for the current time from the latest regions set
+ final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
+ LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
+
do {
LOG.debug("Computing prune upper bound for {}", timeRegions);
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
@@ -302,6 +309,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
continue;
}
+ // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
+ // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
+ // compacted. This ensures that transient tables do not block pruning progress.
+ transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
+ Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+
// Get the prune upper bounds for all the transactional regions
Map<byte[], Long> pruneUpperBoundRegions =
dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
@@ -335,6 +351,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
return -1;
}
+ private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
+ SortedSet<byte[]> transactionalRegions) {
+ return Sets.filter(transactionalRegions,
+ new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] region) {
+ return existingTables.contains(HRegionInfo.getTable(region));
+ }
+ });
+ }
+
+ private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
+ Set<TableName> tableNames = new HashSet<>(regions.size());
+ for (byte[] region : regions) {
+ tableNames.add(HRegionInfo.getTable(region));
+ }
+ return tableNames;
+ }
+
private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
SortedSet<byte[]> transactionalRegions,
Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index f2c1abc..55348b0 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -386,6 +386,71 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
}
+ @Test
+ public void testPruneTransientTable() throws Exception {
+ // Make sure that transient tables do not block the progress of pruning
+
+ // Create a temp table
+ TableName txTempTable = TableName.valueOf("tempTable");
+ createTable(txTempTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TableName txDataTable2 = null;
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+
+ // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet.
+ // This run is only to store the initial set of regions
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now delete the transient table
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ txTempTable = null;
+
+ // Compact the data table now
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Create a new table that will not be compacted
+ txDataTable2 = TableName.valueOf("invalidListPruneTestTable2");
+ createTable(txDataTable2.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore,
+ // and txDataTable2 has not been compacted/flushed yet
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+ } finally {
+ transactionPruningPlugin.destroy();
+ if (txDataTable2 != null) {
+ hBaseAdmin.disableTable(txDataTable2);
+ hBaseAdmin.deleteTable(txDataTable2);
+ }
+ if (txTempTable != null) {
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ }
+ }
+ }
+
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation =
testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0-cdh/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/resources/logback-test.xml b/tephra-hbase-compat-1.0-cdh/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..36f0a37
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/test/resources/logback-test.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager -->
+ <logger name="BlockStateChange" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+ <logger name="org.mortbay.log" level="WARN" />
+ <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 8142601..1a895b2 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -20,6 +20,7 @@
package org.apache.tephra.hbase.txprune;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -287,6 +290,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
* @throws IOException when not able to talk to HBase
*/
private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ // Get the tables for the current time from the latest regions set
+ final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
+ LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
+
do {
LOG.debug("Computing prune upper bound for {}", timeRegions);
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
@@ -303,6 +310,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
continue;
}
+ // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
+ // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
+ // compacted. This ensures that transient tables do not block pruning progress.
+ transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
+ Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+
// Get the prune upper bounds for all the transactional regions
Map<byte[], Long> pruneUpperBoundRegions =
dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
@@ -336,6 +352,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
return -1;
}
+ private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
+ SortedSet<byte[]> transactionalRegions) {
+ return Sets.filter(transactionalRegions,
+ new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] region) {
+ return existingTables.contains(HRegionInfo.getTable(region));
+ }
+ });
+ }
+
+ private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
+ Set<TableName> tableNames = new HashSet<>(regions.size());
+ for (byte[] region : regions) {
+ tableNames.add(HRegionInfo.getTable(region));
+ }
+ return tableNames;
+ }
+
private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
SortedSet<byte[]> transactionalRegions,
Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index ac5e923..55348b0 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -384,7 +384,72 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
hBaseAdmin.disableTable(txEmptyTable);
hBaseAdmin.deleteTable(txEmptyTable);
}
+ }
+
+ @Test
+ public void testPruneTransientTable() throws Exception {
+ // Make sure that transient tables do not block the progress of pruning
+
+ // Create a temp table
+ TableName txTempTable = TableName.valueOf("tempTable");
+ createTable(txTempTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TableName txDataTable2 = null;
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+
+ // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet.
+ // This run is only to store the initial set of regions
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now delete the transient table
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ txTempTable = null;
+
+ // Compact the data table now
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Create a new table that will not be compacted
+ txDataTable2 = TableName.valueOf("invalidListPruneTestTable2");
+ createTable(txDataTable2.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore,
+ // and txDataTable2 has not been compacted/flushed yet
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+ } finally {
+ transactionPruningPlugin.destroy();
+ if (txDataTable2 != null) {
+ hBaseAdmin.disableTable(txDataTable2);
+ hBaseAdmin.deleteTable(txDataTable2);
+ }
+ if (txTempTable != null) {
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ }
}
+ }
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation =
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/resources/logback-test.xml b/tephra-hbase-compat-1.0/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..36f0a37
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/test/resources/logback-test.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager -->
+ <logger name="BlockStateChange" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+ <logger name="org.mortbay.log" level="WARN" />
+ <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 84c480a..42c9f84 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -20,6 +20,7 @@
package org.apache.tephra.hbase.txprune;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -286,6 +289,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
* @throws IOException when not able to talk to HBase
*/
private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ // Get the tables for the current time from the latest regions set
+ final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
+ LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
+
do {
LOG.debug("Computing prune upper bound for {}", timeRegions);
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
@@ -302,6 +309,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
continue;
}
+ // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
+ // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
+ // compacted. This ensures that transient tables do not block pruning progress.
+ transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
+ Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+
// Get the prune upper bounds for all the transactional regions
Map<byte[], Long> pruneUpperBoundRegions =
dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
@@ -335,6 +351,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
return -1;
}
+ private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
+ SortedSet<byte[]> transactionalRegions) {
+ return Sets.filter(transactionalRegions,
+ new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] region) {
+ return existingTables.contains(HRegionInfo.getTable(region));
+ }
+ });
+ }
+
+ private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
+ Set<TableName> tableNames = new HashSet<>(regions.size());
+ for (byte[] region : regions) {
+ tableNames.add(HRegionInfo.getTable(region));
+ }
+ return tableNames;
+ }
+
private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
SortedSet<byte[]> transactionalRegions,
Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index ac5e923..55348b0 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -384,7 +384,72 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
hBaseAdmin.disableTable(txEmptyTable);
hBaseAdmin.deleteTable(txEmptyTable);
}
+ }
+
+ @Test
+ public void testPruneTransientTable() throws Exception {
+ // Make sure that transient tables do not block the progress of pruning
+
+ // Create a temp table
+ TableName txTempTable = TableName.valueOf("tempTable");
+ createTable(txTempTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TableName txDataTable2 = null;
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+
+ // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet.
+ // This run is only to store the initial set of regions
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now delete the transient table
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ txTempTable = null;
+
+ // Compact the data table now
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Create a new table that will not be compacted
+ txDataTable2 = TableName.valueOf("invalidListPruneTestTable2");
+ createTable(txDataTable2.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore,
+ // and txDataTable2 has not been compacted/flushed yet
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+ } finally {
+ transactionPruningPlugin.destroy();
+ if (txDataTable2 != null) {
+ hBaseAdmin.disableTable(txDataTable2);
+ hBaseAdmin.deleteTable(txDataTable2);
+ }
+ if (txTempTable != null) {
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ }
}
+ }
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation =
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.1-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/resources/logback-test.xml b/tephra-hbase-compat-1.1-base/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..36f0a37
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/resources/logback-test.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager -->
+ <logger name="BlockStateChange" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+ <logger name="org.mortbay.log" level="WARN" />
+ <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 84c480a..42c9f84 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -20,6 +20,7 @@
package org.apache.tephra.hbase.txprune;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -286,6 +289,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
* @throws IOException when not able to talk to HBase
*/
private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ // Get the tables for the current time from the latest regions set
+ final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
+ LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
+
do {
LOG.debug("Computing prune upper bound for {}", timeRegions);
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
@@ -302,6 +309,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
continue;
}
+ // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
+ // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
+ // compacted. This ensures that transient tables do not block pruning progress.
+ transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
+ Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+
// Get the prune upper bounds for all the transactional regions
Map<byte[], Long> pruneUpperBoundRegions =
dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
@@ -335,6 +351,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
return -1;
}
+ private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
+ SortedSet<byte[]> transactionalRegions) {
+ return Sets.filter(transactionalRegions,
+ new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] region) {
+ return existingTables.contains(HRegionInfo.getTable(region));
+ }
+ });
+ }
+
+ private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
+ Set<TableName> tableNames = new HashSet<>(regions.size());
+ for (byte[] region : regions) {
+ tableNames.add(HRegionInfo.getTable(region));
+ }
+ return tableNames;
+ }
+
private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
SortedSet<byte[]> transactionalRegions,
Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index ac5e923..55348b0 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -384,7 +384,72 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
hBaseAdmin.disableTable(txEmptyTable);
hBaseAdmin.deleteTable(txEmptyTable);
}
+ }
+
+ @Test
+ public void testPruneTransientTable() throws Exception {
+ // Make sure that transient tables do not block the progress of pruning
+
+ // Create a temp table
+ TableName txTempTable = TableName.valueOf("tempTable");
+ createTable(txTempTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TableName txDataTable2 = null;
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+
+ // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet.
+ // This run is only to store the initial set of regions
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now delete the transient table
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ txTempTable = null;
+
+ // Compact the data table now
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Create a new table that will not be compacted
+ txDataTable2 = TableName.valueOf("invalidListPruneTestTable2");
+ createTable(txDataTable2.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore,
+ // and txDataTable2 has not been compacted/flushed yet
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+ } finally {
+ transactionPruningPlugin.destroy();
+ if (txDataTable2 != null) {
+ hBaseAdmin.disableTable(txDataTable2);
+ hBaseAdmin.deleteTable(txDataTable2);
+ }
+ if (txTempTable != null) {
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ }
}
+ }
private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
HRegionLocation regionLocation =
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.3/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/resources/logback-test.xml b/tephra-hbase-compat-1.3/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..36f0a37
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/test/resources/logback-test.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager -->
+ <logger name="BlockStateChange" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+ <logger name="org.mortbay.log" level="WARN" />
+ <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>