You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/01/02 06:44:31 UTC
incubator-omid git commit: [OMID-124] Change hbase commitTable.Client
to not do async deletes to ct
Repository: incubator-omid
Updated Branches:
refs/heads/1.0.1 48f52f1db -> c010f7509
[OMID-124] Change hbase commitTable.Client to not do async deletes to ct
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/c010f750
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/c010f750
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/c010f750
Branch: refs/heads/1.0.1
Commit: c010f7509ec0e44ddefd5bf132a502c716973dea
Parents: 48f52f1
Author: Yonatan Gottesman <yo...@gmail.com>
Authored: Wed Jan 2 08:44:10 2019 +0200
Committer: Yonatan Gottesman <yo...@gmail.com>
Committed: Wed Jan 2 08:44:10 2019 +0200
----------------------------------------------------------------------
.../apache/omid/benchmarks/tso/RawTxRunner.java | 2 +-
.../apache/omid/committable/CommitTable.java | 2 +-
.../omid/committable/InMemoryCommitTable.java | 2 +-
.../omid/committable/NullCommitTable.java | 2 +-
.../omid/committable/NullCommitTableTest.java | 2 +-
.../transaction/HBaseSyncPostCommitter.java | 2 +-
.../omid/transaction/SnapshotFilterImpl.java | 2 +-
.../apache/omid/transaction/OmidTestBase.java | 1 +
.../omid/transaction/TestShadowCells.java | 4 +-
.../committable/hbase/HBaseCommitTable.java | 138 +++----------------
.../committable/hbase/TestHBaseCommitTable.java | 36 +----
.../apache/omid/transaction/OmidCompactor.java | 7 +-
.../omid/transaction/OmidSnapshotFilter.java | 47 +------
.../transaction/AbstractTransactionManager.java | 2 +-
...stTSOClientRequestAndResponseBehaviours.java | 2 +-
15 files changed, 45 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
----------------------------------------------------------------------
diff --git a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
index f18fb56..511ba05 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
@@ -281,7 +281,7 @@ class RawTxRunner implements Runnable {
try {
commitFuture.get();
- commitTableClient.completeTransaction(txId).get();
+ commitTableClient.deleteCommitEntry(txId).get();
commitTimer.update(System.nanoTime() - commitRequestTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
index f3c15f5..8f578c6 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
@@ -66,7 +66,7 @@ public interface CommitTable {
ListenableFuture<Long> readLowWatermark();
- ListenableFuture<Void> completeTransaction(long startTimestamp);
+ ListenableFuture<Void> deleteCommitEntry(long startTimestamp);
/**
* Atomically tries to invalidate a non-committed transaction launched by a previous TSO server.
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
index 6f9f384..14a4f76 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
@@ -103,7 +103,7 @@ public class InMemoryCommitTable implements CommitTable {
}
@Override
- public ListenableFuture<Void> completeTransaction(long startTimestamp) {
+ public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
SettableFuture<Void> f = SettableFuture.create();
table.remove(startTimestamp);
f.set(null);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
index c27a238..176d4d9 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
@@ -78,7 +78,7 @@ public class NullCommitTable implements CommitTable {
}
@Override
- public ListenableFuture<Void> completeTransaction(long startTimestamp) {
+ public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
SettableFuture<Void> f = SettableFuture.create();
f.set(null);
return f;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
----------------------------------------------------------------------
diff --git a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
index cb0fbf9..0d539d8 100644
--- a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
+++ b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
@@ -57,7 +57,7 @@ public class NullCommitTableTest {
// expected
}
- assertNull(commitTableClient.completeTransaction(TEST_ST).get());
+ assertNull(commitTableClient.deleteCommitEntry(TEST_ST).get());
// Test writer
commitTableWriter.updateLowWatermark(TEST_LWM);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index d5f9c4d..a7ad55f 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -113,7 +113,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
commitTableUpdateTimer.start();
try {
- commitTableClient.completeTransaction(tx.getStartTimestamp()).get();
+ commitTableClient.deleteCommitEntry(tx.getStartTimestamp()).get();
updateSCFuture.set(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
index 5c88e92..d095858 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
@@ -212,7 +212,7 @@ public class SnapshotFilterImpl implements SnapshotFilter {
commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
if (commitTimeStamp.isPresent()) {
// Remove false invalidation from commit table
- commitTableClient.completeTransaction(cellStartTimestamp);
+ commitTableClient.deleteCommitEntry(cellStartTimestamp);
return commitTimeStamp.get();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
index cb09e3c..0e7969d 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
@@ -89,6 +89,7 @@ public abstract class OmidTestBase {
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setPort(1234);
tsoConfig.setConflictMapSize(1000);
+ tsoConfig.setWaitStrategy("LOW_CPU");
Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
TSOServer tso = injector.getInstance(TSOServer.class);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index b5e186f..23ea809 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -293,8 +293,8 @@ public class TestShadowCells extends OmidTestBase {
"Cell should be there");
assertFalse(hasShadowCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
"Shadow cell should not be there");
- // 2) and thus, completeTransaction() was never called on the commit table...
- verify(commitTableClient, times(0)).completeTransaction(anyLong());
+ // 2) and thus, deleteCommitEntry() was never called on the commit table...
+ verify(commitTableClient, times(0)).deleteCommitEntry(anyLong());
// 3) ...and commit value still in commit table
assertTrue(commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get().isPresent());
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index 6320e4d..b83f1a3 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -21,16 +21,10 @@ import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TA
import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.INVALID_TX_QUALIFIER;
import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_QUALIFIER;
import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_ROW;
-
import java.io.IOException;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+
import javax.inject.Inject;
@@ -53,7 +47,6 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
@@ -169,27 +162,12 @@ public class HBaseCommitTable implements CommitTable {
}
- class HBaseClient implements Client, Runnable {
+ class HBaseClient implements Client{
final Table table;
- final Table deleteTable;
- final ExecutorService deleteBatchExecutor;
- final BlockingQueue<DeleteRequest> deleteQueue;
- boolean isClosed = false; // @GuardedBy("this")
- final static int DELETE_BATCH_SIZE = 1024;
HBaseClient() throws IOException {
- // TODO: create TTable here instead
table = hbaseConnection.getTable(TableName.valueOf(tableName));
- // FIXME: why is this using autoFlush of false? Why would every Delete
- // need to be send through a separate RPC?
- deleteTable = hbaseConnection.getTable(TableName.valueOf(tableName));
- deleteQueue = new ArrayBlockingQueue<>(DELETE_BATCH_SIZE);
-
- deleteBatchExecutor = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("omid-completor-%d").build());
- deleteBatchExecutor.submit(this);
-
}
@Override
@@ -245,33 +223,31 @@ public class HBaseCommitTable implements CommitTable {
return f;
}
+ // This function is only used to delete a CT entry and should be renamed
@Override
- public ListenableFuture<Void> completeTransaction(long startTimestamp) {
+ public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
+ byte[] key;
try {
- synchronized (this) {
-
- if (isClosed) {
- SettableFuture<Void> f = SettableFuture.create();
- f.setException(new IOException("Not accepting requests anymore"));
- return f;
- }
-
- DeleteRequest req = new DeleteRequest(
- new Delete(startTimestampToKey(startTimestamp), startTimestamp));
- deleteQueue.put(req);
- return req;
- }
- } catch (IOException ioe) {
- LOG.warn("Error generating timestamp for transaction completion", ioe);
+ key = startTimestampToKey(startTimestamp);
+ } catch (IOException e) {
+ LOG.warn("Error generating timestamp for transaction completion", e);
SettableFuture<Void> f = SettableFuture.create();
- f.setException(ioe);
+ f.setException(e);
return f;
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
+ }
+
+ Delete delete = new Delete(key, startTimestamp);
+
+ try {
+ table.delete(delete);
+ } catch (IOException e) {
SettableFuture<Void> f = SettableFuture.create();
- f.setException(ie);
- return f;
+ LOG.warn("Error contacting hbase", e);
+ f.setException(e);
}
+ SettableFuture<Void> f = SettableFuture.create();
+ f.set(null);
+ return f;
}
@Override
@@ -297,79 +273,7 @@ public class HBaseCommitTable implements CommitTable {
}
@Override
- @SuppressWarnings("InfiniteLoopStatement")
- public void run() {
- List<DeleteRequest> reqbatch = new ArrayList<>();
- try {
- while (true) {
- DeleteRequest r = deleteQueue.poll();
- if (r == null && reqbatch.size() == 0) {
- r = deleteQueue.take();
- }
-
- if (r != null) {
- reqbatch.add(r);
- }
-
- if (r == null || reqbatch.size() == DELETE_BATCH_SIZE) {
- List<Delete> deletes = new ArrayList<>();
- for (DeleteRequest dr : reqbatch) {
- deletes.add(dr.getDelete());
- }
- try {
- deleteTable.delete(deletes);
- for (DeleteRequest dr : reqbatch) {
- dr.complete();
- }
- } catch (IOException ioe) {
- LOG.warn("Error contacting hbase", ioe);
- for (DeleteRequest dr : reqbatch) {
- dr.error(ioe);
- }
- } finally {
- reqbatch.clear();
- }
- }
- }
- } catch (InterruptedException ie) {
- // Drain the queue and place the exception in the future
- // for those who placed requests
- LOG.warn("Draining delete queue");
- DeleteRequest queuedRequest = deleteQueue.poll();
- while (queuedRequest != null) {
- reqbatch.add(queuedRequest);
- queuedRequest = deleteQueue.poll();
- }
- for (DeleteRequest dr : reqbatch) {
- dr.error(new IOException("HBase CommitTable is going to be closed"));
- }
- reqbatch.clear();
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- LOG.error("Transaction completion thread threw exception", t);
- }
- }
-
- @Override
public synchronized void close() throws IOException {
- isClosed = true;
- deleteBatchExecutor.shutdownNow(); // may need to interrupt take
- try {
- if (!deleteBatchExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
- LOG.warn("Delete executor did not shutdown");
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
-
- LOG.warn("Re-Draining delete queue just in case");
- DeleteRequest queuedRequest = deleteQueue.poll();
- while (queuedRequest != null) {
- queuedRequest.error(new IOException("HBase CommitTable is going to be closed"));
- queuedRequest = deleteQueue.poll();
- }
-
- deleteTable.close();
table.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
index b67e2a8..a29def2 100644
--- a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
+++ b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
@@ -21,7 +21,6 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
@@ -41,10 +40,8 @@ import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.Client;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.committable.CommitTable.Writer;
-import org.apache.omid.committable.hbase.HBaseCommitTable.HBaseClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -165,7 +162,7 @@ public class TestHBaseCommitTable {
// Test the successful deletion of the 1000 txs
Future<Void> f;
for (long i = 0; i < 1000; i++) {
- f = client.completeTransaction(i);
+ f = client.deleteCommitEntry(i);
f.get();
}
assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
@@ -262,37 +259,6 @@ public class TestHBaseCommitTable {
}
- @Test(timeOut = 30_000)
- public void testClosingClientEmptyQueuesProperly() throws Throwable {
- HBaseCommitTableConfig config = new HBaseCommitTableConfig();
- config.setTableName(TEST_TABLE);
- HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
-
- Writer writer = commitTable.getWriter();
- HBaseCommitTable.HBaseClient client = (HBaseClient) commitTable.getClient();
-
- for (int i = 0; i < 1000; i++) {
- writer.addCommittedTransaction(i, i + 1);
- }
- writer.flush();
-
- // Completing first transaction should be fine
- client.completeTransaction(0).get();
- assertEquals(rowCount(TABLE_NAME, commitTableFamily), 999, "Rows should be 999!");
-
- // When closing, removing a transaction should throw an EE with an IOException
- client.close();
- try {
- client.completeTransaction(1).get();
- Assert.fail();
- } catch (ExecutionException e) {
- // Expected
- }
- assertEquals(client.deleteQueue.size(), 0, "Delete queue size should be 0!");
- assertEquals(rowCount(TABLE_NAME, commitTableFamily), 999, "Rows should be 999!");
-
- }
-
private static long rowCount(TableName tableName, byte[] family) throws Throwable {
Scan scan = new Scan();
scan.addFamily(family);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
index 0f39737..f8ed6b7 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
@@ -73,6 +73,7 @@ public class OmidCompactor extends BaseRegionObserver {
// If retained, the deleted cell will appear after a minor compaction, but
// will be deleted anyways after a major one
private boolean retainNonTransactionallyDeletedCells;
+ private CommitTable commitTable;
public OmidCompactor() {
this(false);
@@ -86,12 +87,15 @@ public class OmidCompactor extends BaseRegionObserver {
@Override
public void start(CoprocessorEnvironment env) throws IOException {
LOG.info("Starting compactor coprocessor");
- this.env = (RegionCoprocessorEnvironment) env;
commitTableConf = new HBaseCommitTableConfig();
String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
if (commitTableName != null) {
commitTableConf.setTableName(commitTableName);
}
+ commitTable = new HBaseCommitTable(RegionConnectionFactory
+ .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION,
+ (RegionCoprocessorEnvironment) env)
+ , commitTableConf);
retainNonTransactionallyDeletedCells =
env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
@@ -152,7 +156,6 @@ public class OmidCompactor extends BaseRegionObserver {
private CommitTable.Client initAndGetCommitTableClient() throws IOException {
LOG.info("Trying to get the commit table client");
- CommitTable commitTable = new HBaseCommitTable(RegionConnectionFactory.getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, env), commitTableConf);
CommitTable.Client commitTableClient = commitTable.getClient();
LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
return commitTableClient;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index 115a467..eb5d50f 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.filter.Filter;
-
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -68,9 +66,10 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap<>();
private CommitTable.Client inMemoryCommitTable = null;
+ private CommitTable commitTable;
public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
- LOG.info("Compactor coprocessor initialized with constructor for testing");
+ LOG.info("Compactor coprocessor initialized");
this.inMemoryCommitTable = commitTableClient;
}
@@ -79,7 +78,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
}
@Override
- public void start(CoprocessorEnvironment env) {
+ public void start(CoprocessorEnvironment env) throws IOException {
LOG.info("Starting snapshot filter coprocessor");
this.env = (RegionCoprocessorEnvironment)env;
commitTableConf = new HBaseCommitTableConfig();
@@ -88,16 +87,13 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
commitTableConf.setTableName(commitTableName);
}
LOG.info("Snapshot filter started");
+ commitTable = new HBaseCommitTable(RegionConnectionFactory
+ .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env),
+ commitTableConf);
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- LOG.info("Stopping snapshot filter coprocessor");
- if (snapshotFilterQueue != null) {
- for (SnapshotFilterImpl snapshotFilter: snapshotFilterQueue) {
- snapshotFilter.closeCommitTableClient();
- }
- }
LOG.info("Snapshot filter stopped");
}
@@ -165,37 +161,9 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
Filter newFilter = TransactionFilters.getVisibilityFilter(scan.getFilter(),
snapshotFilter, hbaseTransaction);
scan.setFilter(newFilter);
- snapshotFilterMap.put(scan, snapshotFilter);
return;
}
- // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
- public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
- Scan scan,
- RegionScanner s) {
- byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
-
- if (byteTransaction == null) {
- return s;
- }
-
- SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(scan);
- assert(snapshotFilter != null);
- snapshotFilterMap.remove(scan);
- snapshotFilterMap.put(s, snapshotFilter);
- return s;
- }
-
- // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
- public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s) {
- SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(s);
- if (snapshotFilter != null) {
- snapshotFilterQueue.add(snapshotFilter);
- }
- }
-
-
-
private HBaseTransaction getHBaseTransaction(byte[] byteTransaction, boolean isLowLatency)
throws InvalidProtocolBufferException {
TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction);
@@ -210,13 +178,10 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
}
private CommitTable.Client initAndGetCommitTableClient() throws IOException {
- LOG.info("Trying to get the commit table client");
if (inMemoryCommitTable != null) {
return inMemoryCommitTable;
}
- CommitTable commitTable = new HBaseCommitTable(RegionConnectionFactory.getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, env), commitTableConf);
CommitTable.Client commitTableClient = commitTable.getClient();
- LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
return commitTableClient;
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 99abdb6..5075a7f 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -365,7 +365,7 @@ public abstract class AbstractTransactionManager implements TransactionManager {
if (!committed) {
// Transaction has been invalidated by other client
rollback(tx);
- commitTableClient.completeTransaction(tx.getStartTimestamp());
+ commitTableClient.deleteCommitEntry(tx.getStartTimestamp());
rolledbackTxsCounter.inc();
throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated");
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index 080c23e..48d3a40 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -390,7 +390,7 @@ public class TestTSOClientRequestAndResponseBehaviours {
clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
// Simulate remove entry from the commit table before exercise retry
- commitTable.getClient().completeTransaction(tx1ST);
+ commitTable.getClient().deleteCommitEntry(tx1ST);
TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");