You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/02/25 11:55:04 UTC

[incubator-omid] 01/02: [OMID-132] batch postcommit shadow cell updates

This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git

commit b8c440e663e480656d0daa1710c9d33f45b82a85
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Sun Feb 24 12:02:00 2019 +0200

    [OMID-132] batch postcommit shadow cell updates
---
 .../omid/transaction/HBaseSyncPostCommitter.java   | 56 +++++++++++++++-------
 .../apache/omid/transaction/TestShadowCells.java   | 27 +++++++++++
 .../omid/transaction/TestSnapshotFilter.java       |  4 +-
 .../java/org/apache/omid/tso/client/TSOClient.java | 11 +++--
 4 files changed, 74 insertions(+), 24 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index a7ad55f..c9fa5e5 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -20,9 +20,16 @@ package org.apache.omid.transaction;
 import static org.apache.omid.metrics.MetricsUtils.name;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -43,6 +50,8 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
 
     private final Timer commitTableUpdateTimer;
     private final Timer shadowCellsUpdateTimer;
+    private static final int MAX_BATCH_SIZE=1000;
+
 
     public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient) {
         this.metrics = metrics;
@@ -52,18 +61,30 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
         this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
     }
 
-    private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture) {
+    private void flushMutations(Table table, List<Mutation> mutations) throws IOException, InterruptedException {
+        table.batch(mutations, new Object[mutations.size()]);
+    }
+
+    private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
+                               Map<Table,List<Mutation>> mutations) throws IOException, InterruptedException {
         Put put = new Put(cell.getRow());
         put.addColumn(cell.getFamily(),
                 CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
                 cell.getTimestamp(),
                 Bytes.toBytes(tx.getCommitTimestamp()));
-        try {
-            cell.getTable().getHTable().put(put);
-        } catch (IOException e) {
-            LOG.warn("{}: Error inserting shadow cell {}", tx, cell, e);
-            updateSCFuture.setException(
-                    new TransactionManagerException(tx + ": Error inserting shadow cell " + cell, e));
+
+        Table table = cell.getTable().getHTable();
+        List<Mutation> tableMutations = mutations.get(table);
+        if (tableMutations == null) {
+            ArrayList<Mutation> newList = new ArrayList<>();
+            newList.add(put);
+            mutations.put(table, newList);
+        } else {
+            tableMutations.add(put);
+            if (tableMutations.size() > MAX_BATCH_SIZE) {
+                flushMutations(table, tableMutations);
+                mutations.remove(table);
+            }
         }
     }
 
@@ -76,25 +97,26 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
 
         shadowCellsUpdateTimer.start();
         try {
-
+            Map<Table,List<Mutation>> mutations = new HashMap<>();
             // Add shadow cells
             for (HBaseCellId cell : tx.getWriteSet()) {
-                addShadowCell(cell, tx, updateSCFuture);
+                addShadowCell(cell, tx, updateSCFuture, mutations);
             }
 
             for (HBaseCellId cell : tx.getConflictFreeWriteSet()) {
-                addShadowCell(cell, tx, updateSCFuture);
+                addShadowCell(cell, tx, updateSCFuture, mutations);
             }
 
-            // Flush affected tables before returning to avoid loss of shadow cells updates when autoflush is disabled
-            try {
-                tx.flushTables();
-                updateSCFuture.set(null);
-            } catch (IOException e) {
-                LOG.warn("{}: Error while flushing writes", tx, e);
-                updateSCFuture.setException(new TransactionManagerException(tx + ": Error while flushing writes", e));
+            for (Map.Entry<Table,List<Mutation>> entry: mutations.entrySet()) {
+                flushMutations(entry.getKey(), entry.getValue());
             }
 
+            //Only if all is well we set to null and delete commit entry from commit table
+            updateSCFuture.set(null);
+        } catch (IOException | InterruptedException e) {
+            LOG.warn("{}: Error inserting shadow cells", tx, e);
+            updateSCFuture.setException(
+                    new TransactionManagerException(tx + ": Error inserting shadow cells ", e));
         } finally {
             shadowCellsUpdateTimer.stop();
         }
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 23ea809..df274ae 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -301,6 +301,33 @@ public class TestShadowCells extends OmidTestBase {
     }
 
     @Test(timeOut = 60_000)
+    public void testTransactionPostCommitUpdateSCBatch(ITestContext context)
+            throws Exception {
+
+        TransactionManager tm = newTransactionManager(context);
+
+        TTable table = new TTable(connection, TEST_TABLE);
+
+        HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+        // Test shadow cells are created properly
+        Put put = new Put(row);
+        for (int i = 0; i < 1002; ++i) {
+            put.addColumn(family, Bytes.toBytes(String.valueOf("X") + i), data1);
+        }
+        table.put(t1, put);
+
+        tm.commit(t1);
+
+        // After commit test that shadow cells are there
+        for (int i = 0; i < 1002; ++i) {
+            assertTrue(hasShadowCell(row, family, Bytes.toBytes(String.valueOf("X") + i), t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+                    "Shadow cell should be there");
+        }
+    }
+
+
+    @Test(timeOut = 60_000)
     public void testRaceConditionBetweenReaderAndWriterThreads(final ITestContext context) throws Exception {
         final CountDownLatch readAfterCommit = new CountDownLatch(1);
         final CountDownLatch postCommitBegin = new CountDownLatch(1);
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 46b1c4a..4c5cc50 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -891,7 +891,7 @@ public class TestSnapshotFilter {
 
     @Test (timeOut = 60_000)
     public void testFilterCommitCacheInSnapshot() throws Throwable {
-        String TEST_TABLE = "testScanWithFilter";
+        String TEST_TABLE = "testFilterCommitCacheInSnapshot";
         byte[] rowName = Bytes.toBytes("row1");
         byte[] famName = Bytes.toBytes(TEST_FAMILY);
 
@@ -934,7 +934,7 @@ public class TestSnapshotFilter {
 
     @Test (timeOut = 60_000)
     public void testFilterCommitCacheNotInSnapshot() throws Throwable {
-        String TEST_TABLE = "testScanWithFilter";
+        String TEST_TABLE = "testFilterCommitCacheNotInSnapshot";
         byte[] rowName = Bytes.toBytes("row1");
         byte[] famName = Bytes.toBytes(TEST_FAMILY);
 
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 28e8686..960f3a4 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -99,10 +99,10 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     private boolean lowLatency;
 
     // Use to extract unique table identifiers from the modified cells list.
-    private final Set<Long> tableIDs;
+
     // Conflict detection level of the entire system. Can either be Row or Cell level.
     private ConflictDetectionLevel conflictDetectionLevel;
-    private Set<Long> rowLevelWriteSet;
+
 
     // ----------------------------------------------------------------------------------------------------------------
     // Construction
@@ -173,10 +173,10 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         bootstrap.setOption("connectTimeoutMillis", 100);
         lowLatency = false;
 
-        this.tableIDs = new HashSet<Long>();
+
 
         conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
-        rowLevelWriteSet = new HashSet<Long>();
+
     }
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -212,7 +212,8 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
         TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
         commitbuilder.setStartTimestamp(transactionId);
-
+        HashSet<Long> rowLevelWriteSet = new HashSet<Long>();
+        HashSet<Long> tableIDs = new HashSet<Long>();
         rowLevelWriteSet.clear();
         for (CellId cell : cells) {
             long id;