You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/02/25 11:55:03 UTC

[incubator-omid] branch 1.0.1 updated (cbe2a39 -> d30f709)

This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a change to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git.


    from cbe2a39  [OMID-129] - cache uncommitted transactions in TransactionVisibilityFilterBase with Long.MAX_VALUE as commit timestamp
     new b8c440e  [OMID-132] batch postcommit shadow cell updates
     new d30f709  [OMID-133] When TTable autoflush is false, before read/scan flush tables.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../omid/transaction/HBaseSyncPostCommitter.java   | 56 +++++++++++++++-------
 .../java/org/apache/omid/transaction/TTable.java   | 11 +++--
 .../org/apache/omid/transaction/TestAutoFlush.java |  4 ++
 .../apache/omid/transaction/TestShadowCells.java   | 27 +++++++++++
 .../omid/transaction/TestSnapshotFilter.java       |  4 +-
 .../java/org/apache/omid/tso/client/TSOClient.java | 11 +++--
 6 files changed, 86 insertions(+), 27 deletions(-)


[incubator-omid] 02/02: [OMID-133] When TTable autoflush is false, before read/scan flush tables.

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git

commit d30f70930f416e8234be7778fcccaca7984653ee
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Mon Feb 25 12:32:51 2019 +0200

    [OMID-133] When TTable autoflush is false, before read/scan flush tables.
---
 .../src/main/java/org/apache/omid/transaction/TTable.java     | 11 ++++++++---
 .../test/java/org/apache/omid/transaction/TestAutoFlush.java  |  4 ++++
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 44f0708..869a013 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -181,6 +181,8 @@ public class TTable implements Closeable {
 
         throwExceptionIfOpSetsTimerange(get);
 
+        flushCommits();
+
         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
 
         final long readTimestamp = transaction.getReadTimestamp();
@@ -466,7 +468,7 @@ public class TTable implements Closeable {
     public ResultScanner getScanner(Transaction tx, Scan scan) throws IOException {
 
         throwExceptionIfOpSetsTimerange(scan);
-
+        flushCommits();
         HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
 
         Scan tsscan = new Scan(scan);
@@ -670,8 +672,9 @@ public class TTable implements Closeable {
         return table;
     }
 
-    public void setAutoFlush(boolean autoFlush) {
+    public void setAutoFlush(boolean autoFlush) throws IOException {
         this.autoFlush = autoFlush;
+        flushCommits();
     }
 
     public boolean isAutoFlush() {
@@ -680,7 +683,9 @@ public class TTable implements Closeable {
 
     public void flushCommits() throws IOException {
         try {
-            table.batch(this.mutations, new Object[mutations.size()]);
+            if (this.mutations.size() > 0) {
+                table.batch(this.mutations, new Object[mutations.size()]);
+            }
         } catch (InterruptedException e) {
             Thread.interrupted();
             throw new RuntimeException(e);
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java
index fac64ac..ac0a3f0 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java
@@ -52,6 +52,10 @@ public class TestAutoFlush extends OmidTestBase {
         Result result = table.getHTable().get(get);
         assertEquals(result.size(), 0, "Writes are already in DB");
 
+        //data should be readable within same transaction
+        result = table.get(t,get);
+        assertEquals(result.size(), 1, "Writes should be read by same transaction");
+
         tm.commit(t);
 
         // After commit, both the cell and shadow cell should be there.


[incubator-omid] 01/02: [OMID-132] batch postcommit shadow cell updates

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git

commit b8c440e663e480656d0daa1710c9d33f45b82a85
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Sun Feb 24 12:02:00 2019 +0200

    [OMID-132] batch postcommit shadow cell updates
---
 .../omid/transaction/HBaseSyncPostCommitter.java   | 56 +++++++++++++++-------
 .../apache/omid/transaction/TestShadowCells.java   | 27 +++++++++++
 .../omid/transaction/TestSnapshotFilter.java       |  4 +-
 .../java/org/apache/omid/tso/client/TSOClient.java | 11 +++--
 4 files changed, 74 insertions(+), 24 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index a7ad55f..c9fa5e5 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -20,9 +20,16 @@ package org.apache.omid.transaction;
 import static org.apache.omid.metrics.MetricsUtils.name;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -43,6 +50,8 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
 
     private final Timer commitTableUpdateTimer;
     private final Timer shadowCellsUpdateTimer;
+    private static final int MAX_BATCH_SIZE=1000;
+
 
     public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient) {
         this.metrics = metrics;
@@ -52,18 +61,30 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
         this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
     }
 
-    private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture) {
+    private void flushMutations(Table table, List<Mutation> mutations) throws IOException, InterruptedException {
+        table.batch(mutations, new Object[mutations.size()]);
+    }
+
+    private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
+                               Map<Table,List<Mutation>> mutations) throws IOException, InterruptedException {
         Put put = new Put(cell.getRow());
         put.addColumn(cell.getFamily(),
                 CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
                 cell.getTimestamp(),
                 Bytes.toBytes(tx.getCommitTimestamp()));
-        try {
-            cell.getTable().getHTable().put(put);
-        } catch (IOException e) {
-            LOG.warn("{}: Error inserting shadow cell {}", tx, cell, e);
-            updateSCFuture.setException(
-                    new TransactionManagerException(tx + ": Error inserting shadow cell " + cell, e));
+
+        Table table = cell.getTable().getHTable();
+        List<Mutation> tableMutations = mutations.get(table);
+        if (tableMutations == null) {
+            ArrayList<Mutation> newList = new ArrayList<>();
+            newList.add(put);
+            mutations.put(table, newList);
+        } else {
+            tableMutations.add(put);
+            if (tableMutations.size() > MAX_BATCH_SIZE) {
+                flushMutations(table, tableMutations);
+                mutations.remove(table);
+            }
         }
     }
 
@@ -76,25 +97,26 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
 
         shadowCellsUpdateTimer.start();
         try {
-
+            Map<Table,List<Mutation>> mutations = new HashMap<>();
             // Add shadow cells
             for (HBaseCellId cell : tx.getWriteSet()) {
-                addShadowCell(cell, tx, updateSCFuture);
+                addShadowCell(cell, tx, updateSCFuture, mutations);
             }
 
             for (HBaseCellId cell : tx.getConflictFreeWriteSet()) {
-                addShadowCell(cell, tx, updateSCFuture);
+                addShadowCell(cell, tx, updateSCFuture, mutations);
             }
 
-            // Flush affected tables before returning to avoid loss of shadow cells updates when autoflush is disabled
-            try {
-                tx.flushTables();
-                updateSCFuture.set(null);
-            } catch (IOException e) {
-                LOG.warn("{}: Error while flushing writes", tx, e);
-                updateSCFuture.setException(new TransactionManagerException(tx + ": Error while flushing writes", e));
+            for (Map.Entry<Table,List<Mutation>> entry: mutations.entrySet()) {
+                flushMutations(entry.getKey(), entry.getValue());
             }
 
+            //Only if all is well we set to null and delete commit entry from commit table
+            updateSCFuture.set(null);
+        } catch (IOException | InterruptedException e) {
+            LOG.warn("{}: Error inserting shadow cells", tx, e);
+            updateSCFuture.setException(
+                    new TransactionManagerException(tx + ": Error inserting shadow cells ", e));
         } finally {
             shadowCellsUpdateTimer.stop();
         }
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 23ea809..df274ae 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -301,6 +301,33 @@ public class TestShadowCells extends OmidTestBase {
     }
 
     @Test(timeOut = 60_000)
+    public void testTransactionPostCommitUpdateSCBatch(ITestContext context)
+            throws Exception {
+
+        TransactionManager tm = newTransactionManager(context);
+
+        TTable table = new TTable(connection, TEST_TABLE);
+
+        HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+        // Test shadow cells are created properly
+        Put put = new Put(row);
+        for (int i = 0; i < 1002; ++i) {
+            put.addColumn(family, Bytes.toBytes(String.valueOf("X") + i), data1);
+        }
+        table.put(t1, put);
+
+        tm.commit(t1);
+
+        // After commit test that shadow cells are there
+        for (int i = 0; i < 1002; ++i) {
+            assertTrue(hasShadowCell(row, family, Bytes.toBytes(String.valueOf("X") + i), t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+                    "Shadow cell should be there");
+        }
+    }
+
+
+    @Test(timeOut = 60_000)
     public void testRaceConditionBetweenReaderAndWriterThreads(final ITestContext context) throws Exception {
         final CountDownLatch readAfterCommit = new CountDownLatch(1);
         final CountDownLatch postCommitBegin = new CountDownLatch(1);
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 46b1c4a..4c5cc50 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -891,7 +891,7 @@ public class TestSnapshotFilter {
 
     @Test (timeOut = 60_000)
     public void testFilterCommitCacheInSnapshot() throws Throwable {
-        String TEST_TABLE = "testScanWithFilter";
+        String TEST_TABLE = "testFilterCommitCacheInSnapshot";
         byte[] rowName = Bytes.toBytes("row1");
         byte[] famName = Bytes.toBytes(TEST_FAMILY);
 
@@ -934,7 +934,7 @@ public class TestSnapshotFilter {
 
     @Test (timeOut = 60_000)
     public void testFilterCommitCacheNotInSnapshot() throws Throwable {
-        String TEST_TABLE = "testScanWithFilter";
+        String TEST_TABLE = "testFilterCommitCacheNotInSnapshot";
         byte[] rowName = Bytes.toBytes("row1");
         byte[] famName = Bytes.toBytes(TEST_FAMILY);
 
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 28e8686..960f3a4 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -99,10 +99,10 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     private boolean lowLatency;
 
     // Use to extract unique table identifiers from the modified cells list.
-    private final Set<Long> tableIDs;
+
     // Conflict detection level of the entire system. Can either be Row or Cell level.
     private ConflictDetectionLevel conflictDetectionLevel;
-    private Set<Long> rowLevelWriteSet;
+
 
     // ----------------------------------------------------------------------------------------------------------------
     // Construction
@@ -173,10 +173,10 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         bootstrap.setOption("connectTimeoutMillis", 100);
         lowLatency = false;
 
-        this.tableIDs = new HashSet<Long>();
+
 
         conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
-        rowLevelWriteSet = new HashSet<Long>();
+
     }
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -212,7 +212,8 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
         TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
         commitbuilder.setStartTimestamp(transactionId);
-
+        HashSet<Long> rowLevelWriteSet = new HashSet<Long>();
+        HashSet<Long> tableIDs = new HashSet<Long>();
         rowLevelWriteSet.clear();
         for (CellId cell : cells) {
             long id;