You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/02/25 11:55:04 UTC
[incubator-omid] 01/02: [OMID-132] batch postcommit shadow cell
updates
This is an automated email from the ASF dual-hosted git repository.
yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git
commit b8c440e663e480656d0daa1710c9d33f45b82a85
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Sun Feb 24 12:02:00 2019 +0200
[OMID-132] batch postcommit shadow cell updates
---
.../omid/transaction/HBaseSyncPostCommitter.java | 56 +++++++++++++++-------
.../apache/omid/transaction/TestShadowCells.java | 27 +++++++++++
.../omid/transaction/TestSnapshotFilter.java | 4 +-
.../java/org/apache/omid/tso/client/TSOClient.java | 11 +++--
4 files changed, 74 insertions(+), 24 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index a7ad55f..c9fa5e5 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -20,9 +20,16 @@ package org.apache.omid.transaction;
import static org.apache.omid.metrics.MetricsUtils.name;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
@@ -43,6 +50,8 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
private final Timer commitTableUpdateTimer;
private final Timer shadowCellsUpdateTimer;
+ private static final int MAX_BATCH_SIZE=1000;
+
public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient) {
this.metrics = metrics;
@@ -52,18 +61,30 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
}
- private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture) {
+ private void flushMutations(Table table, List<Mutation> mutations) throws IOException, InterruptedException {
+ table.batch(mutations, new Object[mutations.size()]);
+ }
+
+ private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
+ Map<Table,List<Mutation>> mutations) throws IOException, InterruptedException {
Put put = new Put(cell.getRow());
put.addColumn(cell.getFamily(),
CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
cell.getTimestamp(),
Bytes.toBytes(tx.getCommitTimestamp()));
- try {
- cell.getTable().getHTable().put(put);
- } catch (IOException e) {
- LOG.warn("{}: Error inserting shadow cell {}", tx, cell, e);
- updateSCFuture.setException(
- new TransactionManagerException(tx + ": Error inserting shadow cell " + cell, e));
+
+ Table table = cell.getTable().getHTable();
+ List<Mutation> tableMutations = mutations.get(table);
+ if (tableMutations == null) {
+ ArrayList<Mutation> newList = new ArrayList<>();
+ newList.add(put);
+ mutations.put(table, newList);
+ } else {
+ tableMutations.add(put);
+ if (tableMutations.size() > MAX_BATCH_SIZE) {
+ flushMutations(table, tableMutations);
+ mutations.remove(table);
+ }
}
}
@@ -76,25 +97,26 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
shadowCellsUpdateTimer.start();
try {
-
+ Map<Table,List<Mutation>> mutations = new HashMap<>();
// Add shadow cells
for (HBaseCellId cell : tx.getWriteSet()) {
- addShadowCell(cell, tx, updateSCFuture);
+ addShadowCell(cell, tx, updateSCFuture, mutations);
}
for (HBaseCellId cell : tx.getConflictFreeWriteSet()) {
- addShadowCell(cell, tx, updateSCFuture);
+ addShadowCell(cell, tx, updateSCFuture, mutations);
}
- // Flush affected tables before returning to avoid loss of shadow cells updates when autoflush is disabled
- try {
- tx.flushTables();
- updateSCFuture.set(null);
- } catch (IOException e) {
- LOG.warn("{}: Error while flushing writes", tx, e);
- updateSCFuture.setException(new TransactionManagerException(tx + ": Error while flushing writes", e));
+ for (Map.Entry<Table,List<Mutation>> entry: mutations.entrySet()) {
+ flushMutations(entry.getKey(), entry.getValue());
}
+ //Only if all is well we set to null and delete commit entry from commit table
+ updateSCFuture.set(null);
+ } catch (IOException | InterruptedException e) {
+ LOG.warn("{}: Error inserting shadow cells", tx, e);
+ updateSCFuture.setException(
+ new TransactionManagerException(tx + ": Error inserting shadow cells ", e));
} finally {
shadowCellsUpdateTimer.stop();
}
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 23ea809..df274ae 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -301,6 +301,33 @@ public class TestShadowCells extends OmidTestBase {
}
@Test(timeOut = 60_000)
+ public void testTransactionPostCommitUpdateSCBatch(ITestContext context)
+ throws Exception {
+
+ TransactionManager tm = newTransactionManager(context);
+
+ TTable table = new TTable(connection, TEST_TABLE);
+
+ HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+ // Test shadow cells are created properly
+ Put put = new Put(row);
+ for (int i = 0; i < 1002; ++i) {
+ put.addColumn(family, Bytes.toBytes(String.valueOf("X") + i), data1);
+ }
+ table.put(t1, put);
+
+ tm.commit(t1);
+
+ // After commit test that shadow cells are there
+ for (int i = 0; i < 1002; ++i) {
+ assertTrue(hasShadowCell(row, family, Bytes.toBytes(String.valueOf("X") + i), t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Shadow cell should be there");
+ }
+ }
+
+
+ @Test(timeOut = 60_000)
public void testRaceConditionBetweenReaderAndWriterThreads(final ITestContext context) throws Exception {
final CountDownLatch readAfterCommit = new CountDownLatch(1);
final CountDownLatch postCommitBegin = new CountDownLatch(1);
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 46b1c4a..4c5cc50 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -891,7 +891,7 @@ public class TestSnapshotFilter {
@Test (timeOut = 60_000)
public void testFilterCommitCacheInSnapshot() throws Throwable {
- String TEST_TABLE = "testScanWithFilter";
+ String TEST_TABLE = "testFilterCommitCacheInSnapshot";
byte[] rowName = Bytes.toBytes("row1");
byte[] famName = Bytes.toBytes(TEST_FAMILY);
@@ -934,7 +934,7 @@ public class TestSnapshotFilter {
@Test (timeOut = 60_000)
public void testFilterCommitCacheNotInSnapshot() throws Throwable {
- String TEST_TABLE = "testScanWithFilter";
+ String TEST_TABLE = "testFilterCommitCacheNotInSnapshot";
byte[] rowName = Bytes.toBytes("row1");
byte[] famName = Bytes.toBytes(TEST_FAMILY);
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 28e8686..960f3a4 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -99,10 +99,10 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
private boolean lowLatency;
// Use to extract unique table identifiers from the modified cells list.
- private final Set<Long> tableIDs;
+
// Conflict detection level of the entire system. Can either be Row or Cell level.
private ConflictDetectionLevel conflictDetectionLevel;
- private Set<Long> rowLevelWriteSet;
+
// ----------------------------------------------------------------------------------------------------------------
// Construction
@@ -173,10 +173,10 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
bootstrap.setOption("connectTimeoutMillis", 100);
lowLatency = false;
- this.tableIDs = new HashSet<Long>();
+
conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
- rowLevelWriteSet = new HashSet<Long>();
+
}
// ----------------------------------------------------------------------------------------------------------------
@@ -212,7 +212,8 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
commitbuilder.setStartTimestamp(transactionId);
-
+ HashSet<Long> rowLevelWriteSet = new HashSet<Long>();
+ HashSet<Long> tableIDs = new HashSet<Long>();
rowLevelWriteSet.clear();
for (CellId cell : cells) {
long id;