You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/03/31 09:28:46 UTC

[incubator-omid] 02/03: OMID-139 - Batch transaction cleanup when aborted

This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git

commit dcf0444cfd057e0d10f173b24962ac3237fe51d5
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Sun Mar 31 12:06:04 2019 +0300

    OMID-139 - Batch transaction cleanup when aborted
---
 .../omid/transaction/HBaseSyncPostCommitter.java   |   2 +-
 .../apache/omid/transaction/HBaseTransaction.java  |  60 +++++++++---
 .../apache/omid/transaction/TestShadowCells.java   |   2 +-
 .../omid/transaction/TestTransactionConflict.java  | 103 +++++++++++++++++++++
 4 files changed, 150 insertions(+), 17 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index 68dc8d3..6e2f9ed 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -52,7 +52,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
 
     private final Timer commitTableUpdateTimer;
     private final Timer shadowCellsUpdateTimer;
-    private static final int MAX_BATCH_SIZE=1000;
+    static final int MAX_BATCH_SIZE=1000;
     private final Connection connection;
 
     public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient,
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
index 62ef936..22e4b86 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
@@ -18,16 +18,23 @@
 package org.apache.omid.transaction;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseTransaction.class);
-
+    static final int MAX_DELETE_BATCH_SIZE = 1000;
+    
     public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet,
                             Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm, boolean isLowLatency) {
         super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
@@ -45,28 +52,51 @@ public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
         super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
     }
 
-    private void deleteCell(HBaseCellId cell) {
+
+    private void flushMutations(Table table, List<Mutation> mutations) throws IOException, InterruptedException {
+        table.batch(mutations, new Object[mutations.size()]);
+    }
+
+    private void deleteCell(HBaseCellId cell, Map<Table,List<Mutation>> mutations) throws IOException, InterruptedException {
+
         Delete delete = new Delete(cell.getRow());
         delete.addColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
-        try {
-            cell.getTable().getHTable().delete(delete);
-        } catch (IOException e) {
-            LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e);
+
+        Table table = cell.getTable().getHTable();
+        List<Mutation> tableMutations = mutations.get(table);
+        if (tableMutations == null) {
+            ArrayList<Mutation> newList = new ArrayList<>();
+            newList.add(delete);
+            mutations.put(table, newList);
+        } else {
+            tableMutations.add(delete);
+            if (tableMutations.size() > MAX_DELETE_BATCH_SIZE) {
+                flushMutations(table, tableMutations);
+                mutations.remove(table);
+            }
         }
     }
+
     @Override
     public void cleanup() {
-        for (final HBaseCellId cell : getWriteSet()) {
-            deleteCell(cell);
-        }
 
-        for (final HBaseCellId cell : getConflictFreeWriteSet()) {
-            deleteCell(cell);
-        }
+        Map<Table,List<Mutation>> mutations = new HashMap<>();
+
         try {
-            flushTables();
-        } catch (IOException e) {
-            LOG.warn("Failed flushing tables for Tx {}", getTransactionId(), e);
+            for (final HBaseCellId cell : getWriteSet()) {
+                deleteCell(cell, mutations);
+            }
+
+            for (final HBaseCellId cell : getConflictFreeWriteSet()) {
+                deleteCell(cell, mutations);
+            }
+
+            for (Map.Entry<Table,List<Mutation>> entry: mutations.entrySet()) {
+                flushMutations(entry.getKey(), entry.getValue());
+            }
+
+        } catch (InterruptedException | IOException e) {
+            LOG.warn("Failed cleanup for Tx {}. This issue has been ignored", getTransactionId(), e);
         }
     }
 
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 8cf51a2..e7712e3 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -312,7 +312,7 @@ public class TestShadowCells extends OmidTestBase {
 
         // Test shadow cells are created properly
         Put put = new Put(row);
-        for (int i = 0; i < 1002; ++i) {
+        for (int i = 0; i < HBaseSyncPostCommitter.MAX_BATCH_SIZE*2 + 2; ++i) {
             put.addColumn(family, Bytes.toBytes(String.valueOf("X") + i), data1);
         }
         table.put(t1, put);
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
index a462d56..7181040 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
@@ -33,12 +33,15 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.ITestContext;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
+
 @Test(groups = "sharedHBase")
 public class TestTransactionConflict extends OmidTestBase {
 
@@ -278,6 +281,106 @@ public class TestTransactionConflict extends OmidTestBase {
 
     }
 
+    private int countRows(Table table) throws IOException {
+        Scan scan = new Scan();
+        ResultScanner scanner = table.getScanner(scan);
+        Result r = scanner.next();
+        int rowCount = 0;
+        while (r != null) {
+            r = scanner.next();
+            rowCount++;
+        }
+        return rowCount;
+    }
+
+    @Test(timeOut = 60_000)
+    public void testBatchedCleanup(ITestContext context) throws Exception {
+
+        String table2 = "testBatchedCleanupTABLE2";
+        TableName table2Name = TableName.valueOf(table2);
+
+        try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
+             Admin admin = conn.getAdmin()) {
+            TableName htable2 = TableName.valueOf(table2);
+
+            if (!admin.tableExists(htable2)) {
+                HTableDescriptor desc = new HTableDescriptor(table2Name);
+                HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
+                datafam.setMaxVersions(Integer.MAX_VALUE);
+                desc.addFamily(datafam);
+
+                admin.createTable(desc);
+            }
+
+            if (admin.isTableDisabled(htable2)) {
+                admin.enableTable(htable2);
+            }
+        }
+
+        TransactionManager tm = newTransactionManager(context);
+        TTable tt = new TTable(connection, TEST_TABLE);
+        TTable tt2 = new TTable(connection, table2);
+
+        Transaction t1 = tm.begin();
+        LOG.info("Transaction created " + t1);
+
+        Transaction t2 = tm.begin();
+        LOG.info("Transaction created" + t2);
+
+        byte[] row = Bytes.toBytes("test-simple");
+        byte[] fam = Bytes.toBytes(TEST_FAMILY);
+        byte[] col = Bytes.toBytes("testdata");
+        byte[] data1 = Bytes.toBytes("testWrite-1");
+        byte[] data2 = Bytes.toBytes("testWrite-2");
+
+        Put p = new Put(row);
+        p.addColumn(fam, col, data1);
+        tt.put(t1, p);
+
+        Get g = new Get(row).setMaxVersions();
+        g.addColumn(fam, col);
+        Result r = tt.getHTable().get(g);
+        assertEquals(r.size(), 1, "Unexpected size for read.");
+        assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
+                "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
+
+        int rowcount = HBaseTransaction.MAX_DELETE_BATCH_SIZE*2 + 2;
+
+        // Add this row to cause conflict
+        Put p2 = new Put(row);
+        p2.addColumn(fam, col, data2);
+        tt.put(t2, p2);
+
+        //Add more rows to hit batch
+        for (int i = 0; i < rowcount; i++) {
+            byte[] newRow = Bytes.toBytes("test-del" + i);
+            Put put = new Put(newRow);
+            put.addColumn(fam, col, data2);
+            tt.put(t2, put);
+            tt2.put(t2, put);
+        }
+
+        // validate rows are really written
+        assertEquals(countRows(tt.getHTable()), rowcount + 1, "Unexpected size for read.");
+        assertEquals(countRows(tt2.getHTable()), rowcount, "Unexpected size for read.");
+
+        tm.commit(t1);
+
+        boolean aborted = false;
+        try {
+            tm.commit(t2);
+            fail("Transaction commited successfully");
+        } catch (RollbackException e) {
+            aborted = true;
+        }
+        assertTrue(aborted, "Transaction didn't raise exception");
+
+        // validate rows are cleaned
+        assertEquals(countRows(tt.getHTable()), 1, "Unexpected size for read.");
+        assertEquals(countRows(tt2.getHTable()), 0, "Unexpected size for read.");
+    }
+
+
     @Test(timeOut = 10_000)
     public void testMultipleCellChangesOnSameRow(ITestContext context) throws Exception {
         TransactionManager tm = newTransactionManager(context);