You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/02/01 19:34:16 UTC
[2/2] incubator-tephra git commit: TEPHRA-212 Perform writes to prune
state asynchronously Pass OperationWithAttributes to ensureValidTxLifetime
Add creation of prune State Table
TEPHRA-212 Perform writes to prune state asynchronously Pass OperationWithAttributes to ensureValidTxLifetime Add creation of prune State Table
This closes #29 from GitHub.
Signed-off-by: Gokul Gunasekaran <go...@cask.co>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/0016b203
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/0016b203
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/0016b203
Branch: refs/heads/master
Commit: 0016b2034d6f0e990fedf7c7961c3b4085480160
Parents: 87cb21a
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Thu Jan 26 13:10:10 2017 -0800
Committer: Gokul Gunasekaran <go...@cask.co>
Committed: Wed Feb 1 11:34:00 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/tephra/TxConstants.java | 7 ++
.../hbase/coprocessor/TransactionProcessor.java | 25 +++--
.../tephra/hbase/txprune/CompactionState.java | 34 ++++--
.../tephra/hbase/txprune/DataJanitorState.java | 2 +-
.../txprune/HBaseTransactionPruningPlugin.java | 35 ++++++
.../hbase/txprune/PruneUpperBoundWriter.java | 112 +++++++++++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 42 +++++++
.../hbase/coprocessor/TransactionProcessor.java | 25 +++--
.../tephra/hbase/txprune/CompactionState.java | 34 ++++--
.../tephra/hbase/txprune/DataJanitorState.java | 2 +-
.../txprune/HBaseTransactionPruningPlugin.java | 35 ++++++
.../hbase/txprune/PruneUpperBoundWriter.java | 112 +++++++++++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 41 +++++++
.../hbase/coprocessor/TransactionProcessor.java | 25 +++--
.../tephra/hbase/txprune/CompactionState.java | 34 ++++--
.../tephra/hbase/txprune/DataJanitorState.java | 2 +-
.../txprune/HBaseTransactionPruningPlugin.java | 35 ++++++
.../hbase/txprune/PruneUpperBoundWriter.java | 111 ++++++++++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 41 +++++++
.../hbase/coprocessor/TransactionProcessor.java | 25 +++--
.../tephra/hbase/txprune/CompactionState.java | 34 ++++--
.../tephra/hbase/txprune/DataJanitorState.java | 2 +-
.../txprune/HBaseTransactionPruningPlugin.java | 35 ++++++
.../hbase/txprune/PruneUpperBoundWriter.java | 112 +++++++++++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 41 +++++++
.../hbase/coprocessor/TransactionProcessor.java | 25 +++--
.../tephra/hbase/txprune/CompactionState.java | 34 ++++--
.../tephra/hbase/txprune/DataJanitorState.java | 2 +-
.../txprune/HBaseTransactionPruningPlugin.java | 37 +++++-
.../hbase/txprune/PruneUpperBoundWriter.java | 111 ++++++++++++++++++
.../hbase/txprune/InvalidListPruneTest.java | 41 +++++++
31 files changed, 1162 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 512e93c..1988abf 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -369,6 +369,12 @@ public class TxConstants {
* Interval in seconds to schedule prune run.
*/
public static final String PRUNE_INTERVAL = "data.tx.prune.interval";
+
+ /**
+ * Interval in seconds to schedule flush of prune table entries to store.
+ */
+ public static final String PRUNE_FLUSH_INTERVAL = "data.tx.prune.flush.interval";
+
/**
* Comma separated list of invalid transaction pruning plugins to load
*/
@@ -381,6 +387,7 @@ public class TxConstants {
public static final boolean DEFAULT_PRUNE_ENABLE = false;
public static final String DEFAULT_PRUNE_STATE_TABLE = "data_tx_janitor_state";
public static final long DEFAULT_PRUNE_INTERVAL = TimeUnit.HOURS.toSeconds(6);
+ public static final long DEFAULT_PRUNE_FLUSH_INTERVAL = TimeUnit.MINUTES.toSeconds(1);
public static final String DEFAULT_PLUGIN = "data.tx.prune.plugin.default";
public static final String DEFAULT_PLUGIN_CLASS =
"org.apache.tephra.hbase.txprune.HBaseTransactionPruningPlugin";
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index c42dd64..3d1c7f1 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- // nothing to do
+ if (compactionState != null) {
+ compactionState.stop();
+ }
}
@Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
throws IOException {
Transaction tx = getFromOperation(put);
- ensureValidTxLifetime(e.getEnvironment(), tx);
+ ensureValidTxLifetime(e.getEnvironment(), put, tx);
}
@Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
}
Transaction tx = getFromOperation(delete);
- ensureValidTxLifetime(e.getEnvironment(), tx);
+ ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
- pruneTable);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
}
}
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
* Make sure that the transaction is within the max valid transaction lifetime.
*
* @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+ * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
* @param tx {@link Transaction} supplied by the
* @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
* IOException throw if the value of max lifetime of transaction is unavailable
*/
protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+ @SuppressWarnings("unused") OperationWithAttributes op,
@Nullable Transaction tx) throws IOException {
if (tx == null) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 733f636..f4f1d43 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
public class CompactionState {
private static final Log LOG = LogFactory.getLog(CompactionState.class);
+ private final TableName stateTable;
private final byte[] regionName;
private final String regionNameAsString;
- private final TableName stateTable;
private final DataJanitorState dataJanitorState;
+ private final long pruneFlushInterval;
private volatile long pruneUpperBound = -1;
- public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+ private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+ public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+ this.stateTable = stateTable;
this.regionName = env.getRegion().getRegionName();
this.regionNameAsString = env.getRegion().getRegionNameAsString();
- this.stateTable = stateTable;
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public HTableInterface get() throws IOException {
return env.getTable(stateTable);
}
});
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
}
/**
@@ -75,18 +80,29 @@ public class CompactionState {
}
/**
+ * Stops the current {@link PruneUpperBoundWriter}.
+ */
+ public void stop() {
+ if (pruneUpperBoundWriter != null) {
+ pruneUpperBoundWriter.stop();
+ }
+ }
+
+ /**
* Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
* This method is called after the compaction has successfully completed.
*/
public void persist() {
if (pruneUpperBound != -1) {
- try {
- dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
- LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
- } catch (IOException e) {
- LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
- stateTable, regionNameAsString), e);
+ if (!pruneUpperBoundWriter.isAlive()) {
+ pruneUpperBoundWriter = createPruneUpperBoundWriter();
}
+ pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+ LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
}
}
+
+ private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+ return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index bde843b..5817fe2 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
public static final byte[] FAMILY = {'f'};
+ public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
- private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 44dadc3..80da8d8 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
@@ -126,6 +128,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(),
stateTable.getNameAsString());
+ createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public HTableInterface get() throws IOException {
@@ -218,6 +221,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
}
}
+ /**
+ * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+ *
+ * @param stateTable prune state table name
+ */
+ protected void createPruneTable(TableName stateTable) throws IOException {
+ try {
+ if (hBaseAdmin.tableExists(stateTable)) {
+ LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+ stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+ return;
+ }
+
+ HTableDescriptor htd = new HTableDescriptor(stateTable);
+ htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+ hBaseAdmin.createTable(htd);
+ LOG.info("Created pruneTable {}:{}", stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+ } catch (TableExistsException ex) {
+ // Expected if the prune state table is being created at the same time by another client
+ LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+ stateTable.getNamespaceAsString(), stateTable.getNameAsString(), ex);
+ }
+ }
+
+ /**
+ * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+ * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+ * attach a different coprocessor.
+ *
+ * @param tableDescriptor {@link HTableDescriptor} of the table
+ * @return true if the table is transactional
+ */
protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..c981e15
--- /dev/null
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+ private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+ private final TableName pruneStateTable;
+ private final DataJanitorState dataJanitorState;
+ private final byte[] regionName;
+ private final String regionNameAsString;
+ private final long pruneFlushInterval;
+ private final AtomicLong pruneUpperBound;
+ private final AtomicBoolean shouldFlush;
+
+ private Thread flushThread;
+ private long lastChecked;
+
+ public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
+ byte[] regionName, long pruneFlushInterval) {
+ this.pruneStateTable = pruneStateTable;
+ this.dataJanitorState = dataJanitorState;
+ this.regionName = regionName;
+ this.regionNameAsString = regionNameAsString;
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneUpperBound = new AtomicLong();
+ this.shouldFlush = new AtomicBoolean(false);
+ startFlushThread();
+ }
+
+ public boolean isAlive() {
+ return flushThread.isAlive();
+ }
+
+ public void persistPruneEntry(long pruneUpperBound) {
+ this.pruneUpperBound.set(pruneUpperBound);
+ this.shouldFlush.set(true);
+ }
+
+ public void stop() {
+ if (flushThread != null) {
+ flushThread.interrupt();
+ }
+ }
+
+ private void startFlushThread() {
+ flushThread = new Thread("tephra-prune-upper-bound-writer") {
+ @Override
+ public void run() {
+ while (!isInterrupted()) {
+ long now = System.currentTimeMillis();
+ if (now > (lastChecked + pruneFlushInterval)) {
+ if (shouldFlush.compareAndSet(true, false)) {
+ // should flush data
+ try {
+ dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
+ } catch (IOException ex) {
+ LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
+ pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() +
+ " after compacting region.", ex);
+ // Retry again
+ shouldFlush.set(true);
+ }
+ }
+ lastChecked = now;
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ex) {
+ interrupt();
+ break;
+ }
+ }
+
+ LOG.info("PruneUpperBound Writer thread terminated.");
+ }
+ };
+
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 197a6e2..fbd4d7d 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -61,6 +61,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -82,6 +83,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Setup the configuration to start HBase cluster with the invalid list pruning enabled
conf = HBaseConfiguration.create();
conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+ // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+ conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
AbstractHBaseTableTest.startMiniCluster();
TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
@@ -141,6 +144,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
}
+ private void truncatePruneStateTable() throws Exception {
+ if (hBaseAdmin.tableExists(pruneStateTable)) {
+ if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+ hBaseAdmin.disableTable(pruneStateTable);
+ }
+ HTableDescriptor htd = hBaseAdmin.getTableDescriptor(pruneStateTable);
+ hBaseAdmin.deleteTable(pruneStateTable);
+ hBaseAdmin.createTable(htd);
+ }
+ }
+
@Test
public void testRecordCompactionState() throws Exception {
DataJanitorState dataJanitorState =
@@ -151,6 +165,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
});
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ // Truncate prune state table to clear any data that might have been written by the previous test
+ // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+ // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+ truncatePruneStateTable();
+
// No prune upper bound initially
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -161,17 +182,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
// Run minor compaction
testUtil.compact(txDataTable1, false);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
// No prune upper bound after minor compaction too
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
// Run major compaction, and verify prune upper bound
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(50,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
// Run major compaction again with same snapshot, prune upper bound should not change
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(50,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -185,6 +212,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction again, now prune upper bound should change
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(104,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
}
@@ -202,6 +231,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction, and verify it completes
long now = System.currentTimeMillis();
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
lastMajorCompactionTime >= now);
@@ -215,6 +246,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction, and verify it completes
long now = System.currentTimeMillis();
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
lastMajorCompactionTime >= now);
@@ -230,6 +263,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
});
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ // Truncate prune state table to clear any data that might have been written by the previous test
+ // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+ // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+ truncatePruneStateTable();
+
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
try {
@@ -276,6 +316,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
.add(getRegionName(txDataTable1, Bytes.toBytes(0)))
.build());
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 19eb09e..728adfa 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- // nothing to do
+ if (compactionState != null) {
+ compactionState.stop();
+ }
}
@Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
throws IOException {
Transaction tx = getFromOperation(put);
- ensureValidTxLifetime(e.getEnvironment(), tx);
+ ensureValidTxLifetime(e.getEnvironment(), put, tx);
}
@Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
}
Transaction tx = getFromOperation(delete);
- ensureValidTxLifetime(e.getEnvironment(), tx);
+ ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
- pruneTable);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
}
}
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
* Make sure that the transaction is within the max valid transaction lifetime.
*
* @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+ * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
* @param tx {@link Transaction} supplied by the
* @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
* IOException throw if the value of max lifetime of transaction is unavailable
*/
protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+ @SuppressWarnings("unused") OperationWithAttributes op,
@Nullable Transaction tx) throws IOException {
if (tx == null) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 38a79d6..a1a59ad 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
public class CompactionState {
private static final Log LOG = LogFactory.getLog(CompactionState.class);
+ private final TableName stateTable;
private final byte[] regionName;
private final String regionNameAsString;
- private final TableName stateTable;
private final DataJanitorState dataJanitorState;
+ private final long pruneFlushInterval;
private volatile long pruneUpperBound = -1;
- public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+ private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+ public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+ this.stateTable = stateTable;
this.regionName = env.getRegionInfo().getRegionName();
this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
- this.stateTable = stateTable;
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public HTableInterface get() throws IOException {
return env.getTable(stateTable);
}
});
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
}
/**
@@ -75,18 +80,29 @@ public class CompactionState {
}
/**
+ * Stops the current {@link PruneUpperBoundWriter}.
+ */
+ public void stop() {
+ if (pruneUpperBoundWriter != null) {
+ pruneUpperBoundWriter.stop();
+ }
+ }
+
+ /**
* Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
* This method is called after the compaction has successfully completed.
*/
public void persist() {
if (pruneUpperBound != -1) {
- try {
- dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
- LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
- } catch (IOException e) {
- LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
- stateTable, regionNameAsString), e);
+ if (!pruneUpperBoundWriter.isAlive()) {
+ pruneUpperBoundWriter = createPruneUpperBoundWriter();
}
+ pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+ LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
}
}
+
+ private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+ return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index bde843b..5817fe2 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
public static final byte[] FAMILY = {'f'};
+ public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
- private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 44dadc3..80da8d8 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
@@ -126,6 +128,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(),
stateTable.getNameAsString());
+ createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public HTableInterface get() throws IOException {
@@ -218,6 +221,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
}
}
+ /**
+ * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+ *
+ * @param stateTable prune state table name
+ */
+ protected void createPruneTable(TableName stateTable) throws IOException {
+ try {
+ if (hBaseAdmin.tableExists(stateTable)) {
+ LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+ stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+ return;
+ }
+
+ HTableDescriptor htd = new HTableDescriptor(stateTable);
+ htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+ hBaseAdmin.createTable(htd);
+ LOG.info("Created pruneTable {}:{}", stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+ } catch (TableExistsException ex) {
+ // Expected if the prune state table is being created at the same time by another client
+ LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+ stateTable.getNamespaceAsString(), stateTable.getNameAsString(), ex);
+ }
+ }
+
+ /**
+ * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+ * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+ * attach a different coprocessor.
+ *
+ * @param tableDescriptor {@link HTableDescriptor} of the table
+ * @return true if the table is transactional
+ */
protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..c981e15
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+ private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+ private final TableName pruneStateTable;
+ private final DataJanitorState dataJanitorState;
+ private final byte[] regionName;
+ private final String regionNameAsString;
+ private final long pruneFlushInterval;
+ private final AtomicLong pruneUpperBound;
+ private final AtomicBoolean shouldFlush;
+
+ private Thread flushThread;
+ private long lastChecked;
+
+ public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
+ byte[] regionName, long pruneFlushInterval) {
+ this.pruneStateTable = pruneStateTable;
+ this.dataJanitorState = dataJanitorState;
+ this.regionName = regionName;
+ this.regionNameAsString = regionNameAsString;
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneUpperBound = new AtomicLong();
+ this.shouldFlush = new AtomicBoolean(false);
+ startFlushThread();
+ }
+
+ public boolean isAlive() {
+ return flushThread.isAlive();
+ }
+
+ public void persistPruneEntry(long pruneUpperBound) {
+ this.pruneUpperBound.set(pruneUpperBound);
+ this.shouldFlush.set(true);
+ }
+
+ public void stop() {
+ if (flushThread != null) {
+ flushThread.interrupt();
+ }
+ }
+
+ private void startFlushThread() {
+ flushThread = new Thread("tephra-prune-upper-bound-writer") {
+ @Override
+ public void run() {
+ while (!isInterrupted()) {
+ long now = System.currentTimeMillis();
+ if (now > (lastChecked + pruneFlushInterval)) {
+ if (shouldFlush.compareAndSet(true, false)) {
+ // should flush data
+ try {
+ dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
+ } catch (IOException ex) {
+ LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
+ pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() +
+ " after compacting region.", ex);
+ // Retry again
+ shouldFlush.set(true);
+ }
+ }
+ lastChecked = now;
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ex) {
+ interrupt();
+ break;
+ }
+ }
+
+ LOG.info("PruneUpperBound Writer thread terminated.");
+ }
+ };
+
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 197a6e2..37f732c 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -61,6 +61,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -82,6 +83,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Setup the configuration to start HBase cluster with the invalid list pruning enabled
conf = HBaseConfiguration.create();
conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+ // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+ conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
AbstractHBaseTableTest.startMiniCluster();
TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
@@ -141,6 +144,15 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
}
+ private void truncatePruneStateTable() throws Exception {
+ if (hBaseAdmin.tableExists(pruneStateTable)) {
+ if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+ hBaseAdmin.disableTable(pruneStateTable);
+ }
+ hBaseAdmin.truncateTable(pruneStateTable, true);
+ }
+ }
+
@Test
public void testRecordCompactionState() throws Exception {
DataJanitorState dataJanitorState =
@@ -151,6 +163,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
});
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ // Truncate prune state table to clear any data that might have been written by the previous test
+ // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+ // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+ truncatePruneStateTable();
+
// No prune upper bound initially
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -161,17 +180,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
// Run minor compaction
testUtil.compact(txDataTable1, false);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
// No prune upper bound after minor compaction too
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
// Run major compaction, and verify prune upper bound
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(50,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
// Run major compaction again with same snapshot, prune upper bound should not change
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(50,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -185,6 +210,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction again, now prune upper bound should change
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(104,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
}
@@ -202,6 +229,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction, and verify it completes
long now = System.currentTimeMillis();
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
lastMajorCompactionTime >= now);
@@ -215,6 +244,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction, and verify it completes
long now = System.currentTimeMillis();
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
lastMajorCompactionTime >= now);
@@ -232,6 +263,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
+
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ // Truncate prune state table to clear any data that might have been written by the previous test
+ // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+ // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+ truncatePruneStateTable();
+
try {
// Run without a transaction snapshot first
long now1 = 200;
@@ -276,6 +315,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
.add(getRegionName(txDataTable1, Bytes.toBytes(0)))
.build());
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 19eb09e..728adfa 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- // nothing to do
+ if (compactionState != null) {
+ compactionState.stop();
+ }
}
@Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
throws IOException {
Transaction tx = getFromOperation(put);
- ensureValidTxLifetime(e.getEnvironment(), tx);
+ ensureValidTxLifetime(e.getEnvironment(), put, tx);
}
@Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
}
Transaction tx = getFromOperation(delete);
- ensureValidTxLifetime(e.getEnvironment(), tx);
+ ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
- pruneTable);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
}
}
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
* Make sure that the transaction is within the max valid transaction lifetime.
*
* @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+ * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
* @param tx {@link Transaction} supplied by the
* @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
* IOException throw if the value of max lifetime of transaction is unavailable
*/
protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+ @SuppressWarnings("unused") OperationWithAttributes op,
@Nullable Transaction tx) throws IOException {
if (tx == null) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 850f508..58596be 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
public class CompactionState {
private static final Log LOG = LogFactory.getLog(CompactionState.class);
+ private final TableName stateTable;
private final byte[] regionName;
private final String regionNameAsString;
- private final TableName stateTable;
private final DataJanitorState dataJanitorState;
+ private final long pruneFlushInterval;
private volatile long pruneUpperBound = -1;
- public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+ private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+ public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+ this.stateTable = stateTable;
this.regionName = env.getRegionInfo().getRegionName();
this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
- this.stateTable = stateTable;
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return env.getTable(stateTable);
}
});
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
}
/**
@@ -75,18 +80,29 @@ public class CompactionState {
}
/**
+ * Stops the current {@link PruneUpperBoundWriter}.
+ */
+ public void stop() {
+ if (pruneUpperBoundWriter != null) {
+ pruneUpperBoundWriter.stop();
+ }
+ }
+
+ /**
* Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
* This method is called after the compaction has successfully completed.
*/
public void persist() {
if (pruneUpperBound != -1) {
- try {
- dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
- LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
- } catch (IOException e) {
- LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
- stateTable, regionNameAsString), e);
+ if (!pruneUpperBoundWriter.isAlive()) {
+ pruneUpperBoundWriter = createPruneUpperBoundWriter();
}
+ pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+ LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
}
}
+
+ private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+ return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index c6d03c4..51dc181 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
public static final byte[] FAMILY = {'f'};
+ public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
- private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 83e3948..95216b9 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
@@ -123,6 +125,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
+ createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
@@ -209,6 +212,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
}
}
+ /**
+ * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+ *
+ * @param stateTable prune state table name
+ */
+ protected void createPruneTable(TableName stateTable) throws IOException {
+ try (Admin admin = this.connection.getAdmin()) {
+ if (admin.tableExists(stateTable)) {
+ LOG.debug("Not creating pruneStateTable {} since it already exists.",
+ stateTable.getNameWithNamespaceInclAsString());
+ return;
+ }
+
+ HTableDescriptor htd = new HTableDescriptor(stateTable);
+ htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+ admin.createTable(htd);
+ LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
+ } catch (TableExistsException ex) {
+ // Expected if the prune state table is being created at the same time by another client
+ LOG.debug("Not creating pruneStateTable {} since it already exists.",
+ stateTable.getNameWithNamespaceInclAsString(), ex);
+ }
+ }
+
+ /**
+ * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+ * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+ * attach a different coprocessor.
+ *
+ * @param tableDescriptor {@link HTableDescriptor} of the table
+ * @return true if the table is transactional
+ */
protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..7bceaff
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+ private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+ private final TableName pruneStateTable;
+ private final DataJanitorState dataJanitorState;
+ private final byte[] regionName;
+ private final String regionNameAsString;
+ private final long pruneFlushInterval;
+ private final AtomicLong pruneUpperBound;
+ private final AtomicBoolean shouldFlush;
+
+ private Thread flushThread;
+ private long lastChecked;
+
+ public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
+ byte[] regionName, long pruneFlushInterval) {
+ this.pruneStateTable = pruneStateTable;
+ this.dataJanitorState = dataJanitorState;
+ this.regionName = regionName;
+ this.regionNameAsString = regionNameAsString;
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneUpperBound = new AtomicLong();
+ this.shouldFlush = new AtomicBoolean(false);
+ startFlushThread();
+ }
+
+ public boolean isAlive() {
+ return flushThread.isAlive();
+ }
+
+ public void persistPruneEntry(long pruneUpperBound) {
+ this.pruneUpperBound.set(pruneUpperBound);
+ this.shouldFlush.set(true);
+ }
+
+ public void stop() {
+ if (flushThread != null) {
+ flushThread.interrupt();
+ }
+ }
+
+ private void startFlushThread() {
+ flushThread = new Thread("tephra-prune-upper-bound-writer") {
+ @Override
+ public void run() {
+ while (!isInterrupted()) {
+ long now = System.currentTimeMillis();
+ if (now > (lastChecked + pruneFlushInterval)) {
+ if (shouldFlush.compareAndSet(true, false)) {
+ // should flush data
+ try {
+ dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
+ } catch (IOException ex) {
+ LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
+ pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting region.", ex);
+ // Retry again
+ shouldFlush.set(true);
+ }
+ }
+ lastChecked = now;
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ex) {
+ interrupt();
+ break;
+ }
+ }
+
+ LOG.info("PruneUpperBound Writer thread terminated.");
+ }
+ };
+
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 310c710..a431ee3 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -59,6 +59,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -78,6 +79,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Setup the configuration to start HBase cluster with the invalid list pruning enabled
conf = HBaseConfiguration.create();
conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+ // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+ conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
AbstractHBaseTableTest.startMiniCluster();
TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
@@ -135,6 +138,15 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
}
+ private void truncatePruneStateTable() throws Exception {
+ if (hBaseAdmin.tableExists(pruneStateTable)) {
+ if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+ hBaseAdmin.disableTable(pruneStateTable);
+ }
+ hBaseAdmin.truncateTable(pruneStateTable, true);
+ }
+ }
+
@Test
public void testRecordCompactionState() throws Exception {
DataJanitorState dataJanitorState =
@@ -145,6 +157,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
});
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ // Truncate prune state table to clear any data that might have been written by the previous test
+ // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+ // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+ truncatePruneStateTable();
+
// No prune upper bound initially
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -155,17 +174,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
// Run minor compaction
testUtil.compact(txDataTable1, false);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
// No prune upper bound after minor compaction too
Assert.assertEquals(-1,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
// Run major compaction, and verify prune upper bound
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(50,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
// Run major compaction again with same snapshot, prune upper bound should not change
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(50,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -179,6 +204,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction again, now prune upper bound should change
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
Assert.assertEquals(104,
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
}
@@ -196,6 +223,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction, and verify it completes
long now = System.currentTimeMillis();
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
lastMajorCompactionTime >= now);
@@ -209,6 +238,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Run major compaction, and verify it completes
long now = System.currentTimeMillis();
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
lastMajorCompactionTime >= now);
@@ -226,6 +257,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
+
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ // Truncate prune state table to clear any data that might have been written by the previous test
+ // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+ // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+ truncatePruneStateTable();
+
try {
// Run without a transaction snapshot first
long now1 = 200;
@@ -270,6 +309,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
.add(getRegionName(txDataTable1, Bytes.toBytes(0)))
.build());
testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 19eb09e..728adfa 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- // nothing to do
+ if (compactionState != null) {
+ compactionState.stop();
+ }
}
@Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
throws IOException {
Transaction tx = getFromOperation(put);
- ensureValidTxLifetime(e.getEnvironment(), tx);
+ ensureValidTxLifetime(e.getEnvironment(), put, tx);
}
@Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
}
Transaction tx = getFromOperation(delete);
- ensureValidTxLifetime(e.getEnvironment(), tx);
+ ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
- pruneTable);
+ if (Boolean.TRUE.equals(pruneEnable)) {
+ String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+ conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+ compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ + pruneTable);
+ }
}
}
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
* Make sure that the transaction is within the max valid transaction lifetime.
*
* @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+ * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
* @param tx {@link Transaction} supplied by the
* @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
* IOException throw if the value of max lifetime of transaction is unavailable
*/
protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+ @SuppressWarnings("unused") OperationWithAttributes op,
@Nullable Transaction tx) throws IOException {
if (tx == null) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 850f508..58596be 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
public class CompactionState {
private static final Log LOG = LogFactory.getLog(CompactionState.class);
+ private final TableName stateTable;
private final byte[] regionName;
private final String regionNameAsString;
- private final TableName stateTable;
private final DataJanitorState dataJanitorState;
+ private final long pruneFlushInterval;
private volatile long pruneUpperBound = -1;
- public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+ private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+ public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+ this.stateTable = stateTable;
this.regionName = env.getRegionInfo().getRegionName();
this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
- this.stateTable = stateTable;
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return env.getTable(stateTable);
}
});
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
}
/**
@@ -75,18 +80,29 @@ public class CompactionState {
}
/**
+ * Stops the current {@link PruneUpperBoundWriter}.
+ */
+ public void stop() {
+ if (pruneUpperBoundWriter != null) {
+ pruneUpperBoundWriter.stop();
+ }
+ }
+
+ /**
* Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
* This method is called after the compaction has successfully completed.
*/
public void persist() {
if (pruneUpperBound != -1) {
- try {
- dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
- LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
- } catch (IOException e) {
- LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
- stateTable, regionNameAsString), e);
+ if (!pruneUpperBoundWriter.isAlive()) {
+ pruneUpperBoundWriter = createPruneUpperBoundWriter();
}
+ pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+ LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
}
}
+
+ private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+ return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index c6d03c4..51dc181 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
@SuppressWarnings("WeakerAccess")
public class DataJanitorState {
public static final byte[] FAMILY = {'f'};
+ public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
- private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
private static final byte[] REGION_TIME_COL = {'r'};
private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 2e3b8d0..a63cf75 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
@@ -124,6 +126,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(),
stateTable.getNameAsString());
+ createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
@@ -210,6 +213,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
}
}
+ /**
+ * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+ *
+ * @param stateTable prune state table name
+ */
+ protected void createPruneTable(TableName stateTable) throws IOException {
+ try (Admin admin = this.connection.getAdmin()) {
+ if (admin.tableExists(stateTable)) {
+ LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+ stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+ return;
+ }
+
+ HTableDescriptor htd = new HTableDescriptor(stateTable);
+ htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+ admin.createTable(htd);
+ LOG.info("Created pruneTable {}:{}", stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+ } catch (TableExistsException ex) {
+ // Expected if the prune state table is being created at the same time by another client
+ LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+ stateTable.getNamespaceAsString(), stateTable.getNameAsString(), ex);
+ }
+ }
+
+ /**
+ * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+ * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+ * attach a different coprocessor.
+ *
+ * @param tableDescriptor {@link HTableDescriptor} of the table
+ * @return true if the table is transactional
+ */
protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
}