You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/03/03 06:31:53 UTC
[incubator-omid] 01/02: [OMID-134] - transactionManager is thread
safe. All HBase Table and Connectoins are close correctly. Create new Table
from connection only when needed
This is an automated email from the ASF dual-hosted git repository.
yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git
commit 3a22dd6f450c1ae0d75b42659aef921b4ade4ba2
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Tue Feb 26 10:31:30 2019 +0200
[OMID-134] - transactionManager is thread safe.
All HBase Table and Connectoins are close correctly.
Create new Table from connection only when needed
---
.../apache/omid/benchmarks/tso/RawTxRunner.java | 3 +-
.../org/apache/omid/committable/CommitTable.java | 4 +-
.../omid/committable/InMemoryCommitTable.java | 8 ----
.../apache/omid/committable/NullCommitTable.java | 8 ----
.../omid/committable/NullCommitTableTest.java | 50 +++++++++++-----------
.../transaction/AttributeSetSnapshotFilter.java | 4 ++
.../omid/transaction/HBaseSyncPostCommitter.java | 23 ++++++----
.../omid/transaction/HBaseTransactionManager.java | 32 +++++++++-----
.../omid/transaction/HTableAccessWrapper.java | 5 +++
.../apache/omid/transaction/SnapshotFilter.java | 2 +-
.../omid/transaction/SnapshotFilterImpl.java | 9 ++--
.../java/org/apache/omid/transaction/TTable.java | 6 +++
.../omid/transaction/TableAccessWrapper.java | 2 +-
.../transaction/TestAsynchronousPostCommitter.java | 6 +--
.../org/apache/omid/transaction/TestFilters.java | 4 +-
.../transaction/TestHBaseTransactionClient.java | 8 ++--
.../apache/omid/transaction/TestShadowCells.java | 8 ++--
.../omid/committable/hbase/HBaseCommitTable.java | 45 ++++++++-----------
.../hbase/regionserver/RegionAccessWrapper.java | 4 ++
.../apache/omid/transaction/CompactorScanner.java | 6 +--
.../org/apache/omid/transaction/CompactorUtil.java | 20 ++++-----
.../org/apache/omid/transaction/OmidCompactor.java | 33 ++++----------
.../omid/transaction/OmidSnapshotFilter.java | 12 +++---
.../apache/omid/transaction/TestCompaction.java | 13 +++---
.../omid/transaction/TestCompactorScanner.java | 4 +-
.../omid/transaction/TestSnapshotFilter.java | 2 +-
.../omid/transaction/TestSnapshotFilterLL.java | 2 +-
.../timestamp/storage/HBaseTimestampStorage.java | 11 ++++-
.../transaction/AbstractTransactionManager.java | 7 +--
29 files changed, 170 insertions(+), 171 deletions(-)
diff --git a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
index 511ba05..27ac437 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
@@ -175,12 +175,11 @@ class RawTxRunner implements Runnable {
if (!wasSuccess) {
callbackExec.shutdownNow();
}
- commitTableClient.close();
tsoClient.close().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
- } catch (ExecutionException | IOException e) {
+ } catch (ExecutionException e) {
// ignore
} finally {
LOG.info("TxRunner {} finished", txRunnerId);
diff --git a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
index 8f578c6..9b20305 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
@@ -31,7 +31,7 @@ public interface CommitTable {
Client getClient() throws IOException;
- interface Writer extends Closeable {
+ interface Writer{
void addCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
@@ -53,7 +53,7 @@ public interface CommitTable {
boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
}
- interface Client extends Closeable {
+ interface Client {
/**
* Checks whether a transaction commit data is inside the commit table The function also checks whether the
diff --git a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
index 14a4f76..9462c3a 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
@@ -72,10 +72,6 @@ public class InMemoryCommitTable implements CommitTable {
// required to make sure the entry was not invalidated.
return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
}
-
- @Override
- public void close() {
- }
}
public class Client implements CommitTable.Client {
@@ -138,10 +134,6 @@ public class InMemoryCommitTable implements CommitTable {
f.set(false);
return f;
}
-
- @Override
- public void close() {
- }
}
public int countElements() {
diff --git a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
index 176d4d9..70e45da 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
@@ -59,11 +59,6 @@ public class NullCommitTable implements CommitTable {
public void flush() throws IOException {
// noop
}
-
- @Override
- public void close() {
- }
-
}
public static class Client implements CommitTable.Client {
@@ -89,8 +84,5 @@ public class NullCommitTable implements CommitTable {
throw new UnsupportedOperationException();
}
- @Override
- public void close() {
- }
}
}
diff --git a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
index 0d539d8..efe5c29 100644
--- a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
+++ b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
@@ -35,36 +35,34 @@ public class NullCommitTableTest {
CommitTable commitTable = new NullCommitTable();
- try (CommitTable.Client commitTableClient = commitTable.getClient();
- CommitTable.Writer commitTableWriter = commitTable.getWriter()) {
+ CommitTable.Client commitTableClient = commitTable.getClient();
+ CommitTable.Writer commitTableWriter = commitTable.getWriter();
- // Test client
- try {
- commitTableClient.readLowWatermark().get();
- } catch (UnsupportedOperationException e) {
- // expected
- }
+ // Test client
+ try {
+ commitTableClient.readLowWatermark().get();
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- try {
- commitTableClient.getCommitTimestamp(TEST_ST).get();
- } catch (UnsupportedOperationException e) {
- // expected
- }
+ try {
+ commitTableClient.getCommitTimestamp(TEST_ST).get();
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- try {
- commitTableClient.tryInvalidateTransaction(TEST_ST).get();
- } catch (UnsupportedOperationException e) {
- // expected
- }
+ try {
+ commitTableClient.tryInvalidateTransaction(TEST_ST).get();
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- assertNull(commitTableClient.deleteCommitEntry(TEST_ST).get());
+ assertNull(commitTableClient.deleteCommitEntry(TEST_ST).get());
- // Test writer
- commitTableWriter.updateLowWatermark(TEST_LWM);
- commitTableWriter.addCommittedTransaction(TEST_ST, TEST_CT);
- commitTableWriter.clearWriteBuffer();
- commitTableWriter.flush();
- }
+ // Test writer
+ commitTableWriter.updateLowWatermark(TEST_LWM);
+ commitTableWriter.addCommittedTransaction(TEST_ST, TEST_CT);
+ commitTableWriter.clearWriteBuffer();
+ commitTableWriter.flush();
}
-
}
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
index 6fdcd44..856d247 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
@@ -57,4 +57,8 @@ public class AttributeSetSnapshotFilter implements SnapshotFilter {
scan.setAttribute(CellUtils.LL_ATTRIBUTE, Bytes.toBytes(transaction.isLowLatency()));
return table.getScanner(scan);
}
+
+ public void close() throws IOException {
+ table.close();
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index c9fa5e5..68dc8d3 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
@@ -51,29 +53,34 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
private final Timer commitTableUpdateTimer;
private final Timer shadowCellsUpdateTimer;
private static final int MAX_BATCH_SIZE=1000;
+ private final Connection connection;
-
- public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient) {
+ public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient,
+ Connection connection) {
this.metrics = metrics;
this.commitTableClient = commitTableClient;
this.commitTableUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "commitTableUpdate", "latency"));
this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
+ this.connection = connection;
}
- private void flushMutations(Table table, List<Mutation> mutations) throws IOException, InterruptedException {
- table.batch(mutations, new Object[mutations.size()]);
+ private void flushMutations(TableName tableName, List<Mutation> mutations) throws IOException, InterruptedException {
+ try (Table table = connection.getTable(tableName)){
+ table.batch(mutations, new Object[mutations.size()]);
+ }
+
}
private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
- Map<Table,List<Mutation>> mutations) throws IOException, InterruptedException {
+ Map<TableName,List<Mutation>> mutations) throws IOException, InterruptedException {
Put put = new Put(cell.getRow());
put.addColumn(cell.getFamily(),
CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
cell.getTimestamp(),
Bytes.toBytes(tx.getCommitTimestamp()));
- Table table = cell.getTable().getHTable();
+ TableName table = cell.getTable().getHTable().getName();
List<Mutation> tableMutations = mutations.get(table);
if (tableMutations == null) {
ArrayList<Mutation> newList = new ArrayList<>();
@@ -97,7 +104,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
shadowCellsUpdateTimer.start();
try {
- Map<Table,List<Mutation>> mutations = new HashMap<>();
+ Map<TableName,List<Mutation>> mutations = new HashMap<>();
// Add shadow cells
for (HBaseCellId cell : tx.getWriteSet()) {
addShadowCell(cell, tx, updateSCFuture, mutations);
@@ -107,7 +114,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
addShadowCell(cell, tx, updateSCFuture, mutations);
}
- for (Map.Entry<Table,List<Mutation>> entry: mutations.entrySet()) {
+ for (Map.Entry<TableName,List<Mutation>> entry: mutations.entrySet()) {
flushMutations(entry.getKey(), entry.getValue());
}
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
index c66b9b2..5620be3 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
@@ -46,6 +47,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {
private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);
+ private final Connection connection;
private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {
@@ -111,9 +113,11 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
public HBaseTransactionManager build() throws IOException, InterruptedException {
- CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
- CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get();
- PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
+ Connection connection = ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration());
+
+ CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient(connection)).get();
+ CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter(connection)).get();
+ PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient, connection)).get();
TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();
return new HBaseTransactionManager(hbaseOmidClientConf,
@@ -121,7 +125,8 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
tsoClient,
commitTableClient,
commitTableWriter,
- new HBaseTransactionFactory());
+ new HBaseTransactionFactory(),
+ connection);
}
private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException {
@@ -129,25 +134,25 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
}
- private Optional<CommitTable.Client> buildCommitTableClient() throws IOException {
+ private Optional<CommitTable.Client> buildCommitTableClient(Connection connection) throws IOException {
HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
- CommitTable commitTable = new HBaseCommitTable(ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration()), commitTableConf);
+ CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
return Optional.of(commitTable.getClient());
}
- private Optional<CommitTable.Writer> buildCommitTableWriter() throws IOException {
+ private Optional<CommitTable.Writer> buildCommitTableWriter(Connection connection) throws IOException {
HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
- CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
+ CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
return Optional.of(commitTable.getWriter());
}
- private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {
+ private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient, Connection connection) {
PostCommitActions postCommitter;
PostCommitActions syncPostCommitter = new HBaseSyncPostCommitter(hbaseOmidClientConf.getMetrics(),
- commitTableClient);
+ commitTableClient, connection);
switch(hbaseOmidClientConf.getPostCommitMode()) {
case ASYNC:
ListeningExecutorService postCommitExecutor =
@@ -175,7 +180,7 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
TSOProtocol tsoClient,
CommitTable.Client commitTableClient,
CommitTable.Writer commitTableWriter,
- HBaseTransactionFactory hBaseTransactionFactory) {
+ HBaseTransactionFactory hBaseTransactionFactory, Connection connection) {
super(hBaseOmidClientConfiguration.getMetrics(),
postCommitter,
@@ -183,11 +188,16 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
commitTableClient,
commitTableWriter,
hBaseTransactionFactory);
+ this.connection = connection;
}
// ----------------------------------------------------------------------------------------------------------------
// AbstractTransactionManager overwritten methods
// ----------------------------------------------------------------------------------------------------------------
+ @Override
+ public void closeResources() throws IOException {
+ connection.close();
+ }
@Override
public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
index f48fa55..0114ca2 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
@@ -60,4 +60,9 @@ public class HTableAccessWrapper implements TableAccessWrapper {
return readTable.getScanner(scan);
}
+ @Override
+ public void close() throws Exception {
+ writeTable.close();
+ readTable.close();
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
index 370ac01..9372868 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-public interface SnapshotFilter {
+public interface SnapshotFilter extends AutoCloseable{
Result get(Get get, HBaseTransaction transaction) throws IOException;
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
index d095858..569f1bd 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
@@ -63,10 +63,6 @@ public class SnapshotFilterImpl implements SnapshotFilter {
private TableAccessWrapper tableAccessWrapper;
- public void closeCommitTableClient() throws IOException {
- commitTableClient.close();
- }
-
private CommitTable.Client commitTableClient;
public TableAccessWrapper getTableAccessWrapper() {
@@ -598,6 +594,11 @@ public class SnapshotFilterImpl implements SnapshotFilter {
.asList();
}
+ @Override
+ public void close() throws Exception {
+ tableAccessWrapper.close();
+ }
+
public class TransactionalClientScanner implements ResultScanner {
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 869a013..e52c405 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -163,6 +163,12 @@ public class TTable implements Closeable {
@Override
public void close() throws IOException {
table.close();
+ try {
+ snapshotFilter.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close TTable resources.");
+ e.printStackTrace();
+ }
}
// ----------------------------------------------------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
index 8f7f6ac..534dc94 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
@@ -29,7 +29,7 @@ import java.util.List;
//This interface is used to wrap the HTableInterface and Region object when doing client and server side filtering accordingly.
-public interface TableAccessWrapper {
+public interface TableAccessWrapper extends AutoCloseable{
Result[] get(List<Get> get) throws IOException;
Result get(Get get) throws IOException;
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
index 5979c80..e2c9933 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
@@ -67,7 +67,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
CommitTable.Client commitTableClient = getCommitTable(context).getClient();
PostCommitActions syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
ListeningExecutorService postCommitExecutor =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
@@ -185,7 +185,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
CommitTable.Client commitTableClient = getCommitTable(context).getClient();
PostCommitActions syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
ListeningExecutorService postCommitExecutor =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
@@ -264,7 +264,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
CommitTable.Client commitTableClient = getCommitTable(context).getClient();
PostCommitActions syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
ListeningExecutorService postCommitExecutor =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
index 4678110..d375084 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
@@ -78,7 +78,7 @@ public class TestFilters extends OmidTestBase {
TTable table = new TTable(connection, TEST_TABLE);
PostCommitActions syncPostCommitter = spy(
- new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
.commitTableClient(commitTableClient)
.commitTableWriter(getCommitTable(context).getWriter())
@@ -127,7 +127,7 @@ public class TestFilters extends OmidTestBase {
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
TTable table = new TTable(connection, TEST_TABLE);
PostCommitActions syncPostCommitter = spy(
- new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
.commitTableClient(commitTableClient)
.commitTableWriter(getCommitTable(context).getWriter())
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
index fb5efdf..45b5ce5 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
@@ -94,7 +94,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
@Test(timeOut = 30_000)
public void testCrashAfterCommit(ITestContext context) throws Exception {
PostCommitActions syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
// The following line emulates a crash after commit that is observed in (*) below
doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -134,7 +134,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
final long NON_EXISTING_CELL_TS = 1000L;
PostCommitActions syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
// The following line emulates a crash after commit that is observed in (*) below
doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -257,7 +257,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
public void testCellCommitTimestampIsLocatedInCommitTable(ITestContext context) throws Exception {
PostCommitActions syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
// The following line emulates a crash after commit that is observed in (*) below
doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -374,7 +374,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
PostCommitActions syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, syncPostCommitter));
// The following line emulates a crash after commit that is observed in (*) below
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index df274ae..8cf51a2 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -141,7 +141,7 @@ public class TestShadowCells extends OmidTestBase {
hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
PostCommitActions syncPostCommitter = spy(
- new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
.postCommitter(syncPostCommitter)
.commitTableClient(commitTableClient)
@@ -189,7 +189,7 @@ public class TestShadowCells extends OmidTestBase {
hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
PostCommitActions syncPostCommitter = spy(
- new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
.postCommitter(syncPostCommitter)
.commitTableWriter(getCommitTable(context).getWriter())
@@ -250,7 +250,7 @@ public class TestShadowCells extends OmidTestBase {
hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
PostCommitActions syncPostCommitter = spy(
- new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
.postCommitter(syncPostCommitter)
.commitTableClient(commitTableClient)
@@ -335,7 +335,7 @@ public class TestShadowCells extends OmidTestBase {
final AtomicBoolean readFailed = new AtomicBoolean(false);
PostCommitActions syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
doAnswer(new Answer<ListenableFuture<Void>>() {
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index b83f1a3..2deb8ee 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -95,13 +95,13 @@ public class HBaseCommitTable implements CommitTable {
private class HBaseWriter implements Writer {
private static final long INITIAL_LWM_VALUE = -1L;
- final Table table;
+
// Our own buffer for operations
final List<Put> writeBuffer = new LinkedList<>();
volatile long lowWatermarkToStore = INITIAL_LWM_VALUE;
- HBaseWriter() throws IOException {
- table = hbaseConnection.getTable(TableName.valueOf(tableName));
+ HBaseWriter() {
+
}
@Override
@@ -120,7 +120,8 @@ public class HBaseCommitTable implements CommitTable {
@Override
public void flush() throws IOException {
- try {
+
+ try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
addLowWatermarkToStoreToWriteBuffer();
table.put(writeBuffer);
writeBuffer.clear();
@@ -137,18 +138,15 @@ public class HBaseCommitTable implements CommitTable {
@Override
public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
- assert (startTimestamp < commitTimestamp);
- byte[] transactionRow = startTimestampToKey(startTimestamp);
- Put put = new Put(transactionRow, startTimestamp);
- byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
- put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
- return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
- }
+ try (Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
+ assert (startTimestamp < commitTimestamp);
+ byte[] transactionRow = startTimestampToKey(startTimestamp);
+ Put put = new Put(transactionRow, startTimestamp);
+ byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
+ put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
+ return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
+ }
- @Override
- public void close() throws IOException {
- clearWriteBuffer();
- table.close();
}
private void addLowWatermarkToStoreToWriteBuffer() {
@@ -164,17 +162,15 @@ public class HBaseCommitTable implements CommitTable {
class HBaseClient implements Client{
- final Table table;
+ HBaseClient(){
- HBaseClient() throws IOException {
- table = hbaseConnection.getTable(TableName.valueOf(tableName));
}
@Override
public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
- try {
+ try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
Get get = new Get(startTimestampToKey(startTimestamp));
get.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER);
get.addColumn(commitTableFamily, INVALID_TX_QUALIFIER);
@@ -206,7 +202,7 @@ public class HBaseCommitTable implements CommitTable {
@Override
public ListenableFuture<Long> readLowWatermark() {
SettableFuture<Long> f = SettableFuture.create();
- try {
+ try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
Get get = new Get(LOW_WATERMARK_ROW);
get.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER);
Result result = table.get(get);
@@ -238,7 +234,7 @@ public class HBaseCommitTable implements CommitTable {
Delete delete = new Delete(key, startTimestamp);
- try {
+ try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
table.delete(delete);
} catch (IOException e) {
SettableFuture<Void> f = SettableFuture.create();
@@ -253,7 +249,7 @@ public class HBaseCommitTable implements CommitTable {
@Override
public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
SettableFuture<Boolean> f = SettableFuture.create();
- try {
+ try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
byte[] row = startTimestampToKey(startTimestamp);
Put invalidationPut = new Put(row, startTimestamp);
invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));
@@ -272,11 +268,6 @@ public class HBaseCommitTable implements CommitTable {
return f;
}
- @Override
- public synchronized void close() throws IOException {
- table.close();
- }
-
private boolean containsATimestamp(Result result) {
return (result != null && result.containsColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER));
}
diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
index 4786eda..1006bc6 100644
--- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
+++ b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
@@ -63,4 +63,8 @@ public class RegionAccessWrapper implements TableAccessWrapper {
return null;
}
+ @Override
+ public void close() throws Exception {
+
+ }
}
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index cf93163..4ba385d 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -54,7 +54,7 @@ public class CompactorScanner implements InternalScanner {
private static final Logger LOG = LoggerFactory.getLogger(CompactorScanner.class);
private final InternalScanner internalScanner;
private final CommitTable.Client commitTableClient;
- private final Queue<CommitTable.Client> commitTableClientQueue;
+
private final boolean isMajorCompaction;
private final boolean retainNonTransactionallyDeletedCells;
private final long lowWatermark;
@@ -67,12 +67,10 @@ public class CompactorScanner implements InternalScanner {
public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner internalScanner,
Client commitTableClient,
- Queue<CommitTable.Client> commitTableClientQueue,
boolean isMajorCompaction,
boolean preserveNonTransactionallyDeletedCells) throws IOException {
this.internalScanner = internalScanner;
this.commitTableClient = commitTableClient;
- this.commitTableClientQueue = commitTableClientQueue;
this.isMajorCompaction = isMajorCompaction;
this.retainNonTransactionallyDeletedCells = preserveNonTransactionallyDeletedCells;
this.lowWatermark = getLowWatermarkFromCommitTable();
@@ -84,6 +82,7 @@ public class CompactorScanner implements InternalScanner {
@Override
public boolean next(List<Cell> results) throws IOException {
+ //TODO YONIGO - why-1 we get exceptions
return next(results, -1);
}
@@ -182,7 +181,6 @@ public class CompactorScanner implements InternalScanner {
@Override
public void close() throws IOException {
internalScanner.close();
- commitTableClientQueue.add(commitTableClient);
}
// ----------------------------------------------------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
index f95191c..41bf13c 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
@@ -85,16 +85,16 @@ public class CompactorUtil {
HBaseLogin.loginIfNeeded(cmdline.loginFlags);
Configuration conf = HBaseConfiguration.create();
- Connection conn = ConnectionFactory.createConnection(conf);
-
- if (cmdline.enable) {
- enableOmidCompaction(conn, TableName.valueOf(cmdline.table),
- Bytes.toBytes(cmdline.columnFamily));
- } else if (cmdline.disable) {
- disableOmidCompaction(conn, TableName.valueOf(cmdline.table),
- Bytes.toBytes(cmdline.columnFamily));
- } else {
- System.err.println("Must specify enable or disable");
+ try (Connection conn = ConnectionFactory.createConnection(conf)) {
+ if (cmdline.enable) {
+ enableOmidCompaction(conn, TableName.valueOf(cmdline.table),
+ Bytes.toBytes(cmdline.columnFamily));
+ } else if (cmdline.disable) {
+ disableOmidCompaction(conn, TableName.valueOf(cmdline.table),
+ Bytes.toBytes(cmdline.columnFamily));
+ } else {
+ System.err.println("Must specify enable or disable");
+ }
}
}
}
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
index f8ed6b7..57f82b0 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
@@ -20,6 +20,7 @@ package org.apache.omid.transaction;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
@@ -64,8 +65,9 @@ public class OmidCompactor extends BaseRegionObserver {
private HBaseCommitTableConfig commitTableConf = null;
private RegionCoprocessorEnvironment env = null;
+
@VisibleForTesting
- Queue<CommitTable.Client> commitTableClientQueue = new ConcurrentLinkedQueue<>();
+ CommitTable.Client commitTableClient;
// When compacting, if a cell which has been marked by HBase as Delete or
// Delete Family (that is, non-transactionally deleted), we allow the user
@@ -73,7 +75,8 @@ public class OmidCompactor extends BaseRegionObserver {
// If retained, the deleted cell will appear after a minor compaction, but
// will be deleted anyways after a major one
private boolean retainNonTransactionallyDeletedCells;
- private CommitTable commitTable;
+
+ private Connection connection;
public OmidCompactor() {
this(false);
@@ -92,10 +95,10 @@ public class OmidCompactor extends BaseRegionObserver {
if (commitTableName != null) {
commitTableConf.setTableName(commitTableName);
}
- commitTable = new HBaseCommitTable(RegionConnectionFactory
- .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION,
- (RegionCoprocessorEnvironment) env)
- , commitTableConf);
+
+ connection = RegionConnectionFactory
+ .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, (RegionCoprocessorEnvironment) env);
+ commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
retainNonTransactionallyDeletedCells =
env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
@@ -105,11 +108,6 @@ public class OmidCompactor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
LOG.info("Stopping compactor coprocessor");
- if (commitTableClientQueue != null) {
- for (CommitTable.Client commitTableClient : commitTableClientQueue) {
- commitTableClient.close();
- }
- }
LOG.info("Compactor coprocessor stopped");
}
@@ -135,15 +133,10 @@ public class OmidCompactor extends BaseRegionObserver {
if (!omidCompactable) {
return scanner;
} else {
- CommitTable.Client commitTableClient = commitTableClientQueue.poll();
- if (commitTableClient == null) {
- commitTableClient = initAndGetCommitTableClient();
- }
boolean isMajorCompaction = request.isMajor();
return new CompactorScanner(env,
scanner,
commitTableClient,
- commitTableClientQueue,
isMajorCompaction,
retainNonTransactionallyDeletedCells);
}
@@ -153,12 +146,4 @@ public class OmidCompactor extends BaseRegionObserver {
throw new DoNotRetryIOException(e);
}
}
-
- private CommitTable.Client initAndGetCommitTableClient() throws IOException {
- LOG.info("Trying to get the commit table client");
- CommitTable.Client commitTableClient = commitTable.getClient();
- LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
- return commitTableClient;
- }
-
}
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index eb5d50f..7ee742d 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -19,6 +19,7 @@ package org.apache.omid.transaction;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -66,7 +67,8 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap<>();
private CommitTable.Client inMemoryCommitTable = null;
- private CommitTable commitTable;
+ private CommitTable.Client commitTableClient;
+ private Connection connection;
public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
LOG.info("Compactor coprocessor initialized");
@@ -86,14 +88,15 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
if (commitTableName != null) {
commitTableConf.setTableName(commitTableName);
}
+ connection = RegionConnectionFactory
+ .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env);
+ commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
LOG.info("Snapshot filter started");
- commitTable = new HBaseCommitTable(RegionConnectionFactory
- .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env),
- commitTableConf);
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
+ LOG.info("stopping Snapshot filter");
LOG.info("Snapshot filter stopped");
}
@@ -181,7 +184,6 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
if (inMemoryCommitTable != null) {
return inMemoryCommitTable;
}
- CommitTable.Client commitTableClient = commitTable.getClient();
return commitTableClient;
}
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
index a4bf65e..197ef3f 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
@@ -200,7 +200,7 @@ public class TestCompaction {
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
CommitTable.Client commitTableClient = commitTable.getClient();
syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
return HBaseTransactionManager.builder(hbaseOmidClientConf)
.postCommitter(syncPostCommitter)
.commitTableClient(commitTableClient)
@@ -241,7 +241,7 @@ public class TestCompaction {
SettableFuture<Long> f = SettableFuture.create();
f.set(fakeAssignedLowWatermark);
doReturn(f).when(commitTableClient).readLowWatermark();
- omidCompactor.commitTableClientQueue.add(commitTableClient);
+ omidCompactor.commitTableClient = commitTableClient;
LOG.info("Compacting table {}", TEST_TABLE);
admin.majorCompact(TableName.valueOf(TEST_TABLE));
@@ -292,7 +292,7 @@ public class TestCompaction {
SettableFuture<Long> f = SettableFuture.create();
f.set(Long.MAX_VALUE);
doReturn(f).when(commitTableClient).readLowWatermark();
- omidCompactor.commitTableClientQueue.add(commitTableClient);
+ omidCompactor.commitTableClient = commitTableClient;
LOG.info("Flushing table {}", TEST_TABLE);
admin.flush(TableName.valueOf(TEST_TABLE));
@@ -361,7 +361,7 @@ public class TestCompaction {
SettableFuture<Long> f = SettableFuture.create();
f.set(neverendingTxBelowLowWatermark.getStartTimestamp());
doReturn(f).when(commitTableClient).readLowWatermark();
- omidCompactor.commitTableClientQueue.add(commitTableClient);
+ omidCompactor.commitTableClient = commitTableClient;
LOG.info("Compacting table {}", TEST_TABLE);
admin.majorCompact(TableName.valueOf(TEST_TABLE));
@@ -422,8 +422,7 @@ public class TestCompaction {
SettableFuture<Long> f = SettableFuture.create();
f.setException(new IOException("Unable to read"));
doReturn(f).when(commitTableClient).readLowWatermark();
- omidCompactor.commitTableClientQueue.add(commitTableClient);
-
+ omidCompactor.commitTableClient = commitTableClient;
LOG.info("Compacting table {}", TEST_TABLE);
admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
@@ -1167,7 +1166,7 @@ public class TestCompaction {
SettableFuture<Long> f = SettableFuture.create();
f.set(lwm);
doReturn(f).when(commitTableClient).readLowWatermark();
- omidCompactor.commitTableClientQueue.add(commitTableClient);
+ omidCompactor.commitTableClient = commitTableClient;
}
private void compactEverything(String tableName) throws Exception {
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
index 8a217b3..eca9714 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
@@ -64,8 +64,7 @@ public class TestCompactorScanner {
ObserverContext<RegionCoprocessorEnvironment> ctx = mock(ObserverContext.class);
InternalScanner internalScanner = mock(InternalScanner.class);
CommitTable.Client ctClient = mock(CommitTable.Client.class);
- @SuppressWarnings("unchecked")
- Queue<Client> queue = mock(Queue.class);
+
RegionCoprocessorEnvironment rce = mock(RegionCoprocessorEnvironment.class);
HRegion hRegion = mock(HRegion.class);
HRegionInfo hRegionInfo = mock(HRegionInfo.class);
@@ -82,7 +81,6 @@ public class TestCompactorScanner {
try (CompactorScanner scanner = spy(new CompactorScanner(ctx,
internalScanner,
ctClient,
- queue,
false,
retainOption))) {
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 4c5cc50..2cfc77e 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -207,7 +207,7 @@ public class TestSnapshotFilter {
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
CommitTable.Client commitTableClient = commitTable.getClient();
syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
return HBaseTransactionManager.builder(hbaseOmidClientConf)
.postCommitter(syncPostCommitter)
.commitTableClient(commitTableClient)
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
index 1bb5691..3496bde 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
@@ -185,7 +185,7 @@ public class TestSnapshotFilterLL {
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
CommitTable.Client commitTableClient = commitTable.getClient();
syncPostCommitter =
- spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
return HBaseTransactionManager.builder(hbaseOmidClientConf)
.postCommitter(syncPostCommitter)
.commitTableClient(commitTableClient)
diff --git a/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java b/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
index a33c9dd..6ff23aa 100644
--- a/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
+++ b/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
@@ -51,11 +51,12 @@ public class HBaseTimestampStorage implements TimestampStorage {
private final Table table;
private final byte[] cfName;
+ private final Connection connection;
@Inject
public HBaseTimestampStorage(Configuration hbaseConfig, HBaseTimestampStorageConfig config) throws IOException {
- Connection conn = ConnectionFactory.createConnection(hbaseConfig);
- this.table = conn.getTable(TableName.valueOf(config.getTableName()));
+ connection = ConnectionFactory.createConnection(hbaseConfig);
+ this.table = connection.getTable(TableName.valueOf(config.getTableName()));
this.cfName = config.getFamilyName().getBytes(UTF_8);
}
@@ -91,4 +92,10 @@ public class HBaseTimestampStorage implements TimestampStorage {
}
+
+ public void close() throws IOException {
+ //TODO this is never called
+ table.close();
+ connection.close();
+ }
}
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 5075a7f..0eca91a 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -314,15 +314,16 @@ public abstract class AbstractTransactionManager implements TransactionManager {
*/
public void postRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
+
+ abstract void closeResources() throws IOException;
+
/**
* @see java.io.Closeable#close()
*/
@Override
public final void close() throws IOException {
-
tsoClient.close();
- commitTableClient.close();
-
+ closeResources();
}
// ----------------------------------------------------------------------------------------------------------------