You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/03/31 09:28:46 UTC
[incubator-omid] 02/03: OMID-139 - Batch transaction cleanup when
aborted
This is an automated email from the ASF dual-hosted git repository.
yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git
commit dcf0444cfd057e0d10f173b24962ac3237fe51d5
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Sun Mar 31 12:06:04 2019 +0300
OMID-139 - Batch transaction cleanup when aborted
---
.../omid/transaction/HBaseSyncPostCommitter.java | 2 +-
.../apache/omid/transaction/HBaseTransaction.java | 60 +++++++++---
.../apache/omid/transaction/TestShadowCells.java | 2 +-
.../omid/transaction/TestTransactionConflict.java | 103 +++++++++++++++++++++
4 files changed, 150 insertions(+), 17 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index 68dc8d3..6e2f9ed 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -52,7 +52,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
private final Timer commitTableUpdateTimer;
private final Timer shadowCellsUpdateTimer;
- private static final int MAX_BATCH_SIZE=1000;
+ static final int MAX_BATCH_SIZE=1000;
private final Connection connection;
public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient,
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
index 62ef936..22e4b86 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
@@ -18,16 +18,23 @@
package org.apache.omid.transaction;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
private static final Logger LOG = LoggerFactory.getLogger(HBaseTransaction.class);
-
+ static final int MAX_DELETE_BATCH_SIZE = 1000;
+
public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet,
Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm, boolean isLowLatency) {
super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
@@ -45,28 +52,51 @@ public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
}
- private void deleteCell(HBaseCellId cell) {
+
+ private void flushMutations(Table table, List<Mutation> mutations) throws IOException, InterruptedException {
+ table.batch(mutations, new Object[mutations.size()]);
+ }
+
+ private void deleteCell(HBaseCellId cell, Map<Table,List<Mutation>> mutations) throws IOException, InterruptedException {
+
Delete delete = new Delete(cell.getRow());
delete.addColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
- try {
- cell.getTable().getHTable().delete(delete);
- } catch (IOException e) {
- LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e);
+
+ Table table = cell.getTable().getHTable();
+ List<Mutation> tableMutations = mutations.get(table);
+ if (tableMutations == null) {
+ ArrayList<Mutation> newList = new ArrayList<>();
+ newList.add(delete);
+ mutations.put(table, newList);
+ } else {
+ tableMutations.add(delete);
+ if (tableMutations.size() > MAX_DELETE_BATCH_SIZE) {
+ flushMutations(table, tableMutations);
+ mutations.remove(table);
+ }
}
}
+
@Override
public void cleanup() {
- for (final HBaseCellId cell : getWriteSet()) {
- deleteCell(cell);
- }
- for (final HBaseCellId cell : getConflictFreeWriteSet()) {
- deleteCell(cell);
- }
+ Map<Table,List<Mutation>> mutations = new HashMap<>();
+
try {
- flushTables();
- } catch (IOException e) {
- LOG.warn("Failed flushing tables for Tx {}", getTransactionId(), e);
+ for (final HBaseCellId cell : getWriteSet()) {
+ deleteCell(cell, mutations);
+ }
+
+ for (final HBaseCellId cell : getConflictFreeWriteSet()) {
+ deleteCell(cell, mutations);
+ }
+
+ for (Map.Entry<Table,List<Mutation>> entry: mutations.entrySet()) {
+ flushMutations(entry.getKey(), entry.getValue());
+ }
+
+ } catch (InterruptedException | IOException e) {
+ LOG.warn("Failed cleanup for Tx {}. This issue has been ignored", getTransactionId(), e);
}
}
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 8cf51a2..e7712e3 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -312,7 +312,7 @@ public class TestShadowCells extends OmidTestBase {
// Test shadow cells are created properly
Put put = new Put(row);
- for (int i = 0; i < 1002; ++i) {
+ for (int i = 0; i < HBaseSyncPostCommitter.MAX_BATCH_SIZE*2 + 2; ++i) {
put.addColumn(family, Bytes.toBytes(String.valueOf("X") + i), data1);
}
table.put(t1, put);
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
index a462d56..7181040 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
@@ -33,12 +33,15 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.ITestContext;
import org.testng.annotations.Test;
+import java.io.IOException;
+
@Test(groups = "sharedHBase")
public class TestTransactionConflict extends OmidTestBase {
@@ -278,6 +281,106 @@ public class TestTransactionConflict extends OmidTestBase {
}
+ private int countRows(Table table) throws IOException {
+ Scan scan = new Scan();
+ ResultScanner scanner = table.getScanner(scan);
+ Result r = scanner.next();
+ int rowCount = 0;
+ while (r != null) {
+ r = scanner.next();
+ rowCount++;
+ }
+ return rowCount;
+ }
+
+ @Test(timeOut = 60_000)
+ public void testBatchedCleanup(ITestContext context) throws Exception {
+
+ String table2 = "testBatchedCleanupTABLE2";
+ TableName table2Name = TableName.valueOf(table2);
+
+ try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
+ Admin admin = conn.getAdmin()) {
+ TableName htable2 = TableName.valueOf(table2);
+
+ if (!admin.tableExists(htable2)) {
+ HTableDescriptor desc = new HTableDescriptor(table2Name);
+ HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
+ datafam.setMaxVersions(Integer.MAX_VALUE);
+ desc.addFamily(datafam);
+
+ admin.createTable(desc);
+ }
+
+ if (admin.isTableDisabled(htable2)) {
+ admin.enableTable(htable2);
+ }
+ }
+
+ TransactionManager tm = newTransactionManager(context);
+ TTable tt = new TTable(connection, TEST_TABLE);
+ TTable tt2 = new TTable(connection, table2);
+
+ Transaction t1 = tm.begin();
+ LOG.info("Transaction created " + t1);
+
+ Transaction t2 = tm.begin();
+ LOG.info("Transaction created" + t2);
+
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
+
+ Put p = new Put(row);
+ p.addColumn(fam, col, data1);
+ tt.put(t1, p);
+
+ Get g = new Get(row).setMaxVersions();
+ g.addColumn(fam, col);
+ Result r = tt.getHTable().get(g);
+ assertEquals(r.size(), 1, "Unexpected size for read.");
+ assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
+ "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
+
+ int rowcount = HBaseTransaction.MAX_DELETE_BATCH_SIZE*2 + 2;
+
+ // Add this row to cause conflict
+ Put p2 = new Put(row);
+ p2.addColumn(fam, col, data2);
+ tt.put(t2, p2);
+
+ //Add more rows to hit batch
+ for (int i = 0; i < rowcount; i++) {
+ byte[] newRow = Bytes.toBytes("test-del" + i);
+ Put put = new Put(newRow);
+ put.addColumn(fam, col, data2);
+ tt.put(t2, put);
+ tt2.put(t2, put);
+ }
+
+ // validate rows are really written
+ assertEquals(countRows(tt.getHTable()), rowcount + 1, "Unexpected size for read.");
+ assertEquals(countRows(tt2.getHTable()), rowcount, "Unexpected size for read.");
+
+ tm.commit(t1);
+
+ boolean aborted = false;
+ try {
+ tm.commit(t2);
+ fail("Transaction commited successfully");
+ } catch (RollbackException e) {
+ aborted = true;
+ }
+ assertTrue(aborted, "Transaction didn't raise exception");
+
+ // validate rows are cleaned
+ assertEquals(countRows(tt.getHTable()), 1, "Unexpected size for read.");
+ assertEquals(countRows(tt2.getHTable()), 0, "Unexpected size for read.");
+ }
+
+
@Test(timeOut = 10_000)
public void testMultipleCellChangesOnSameRow(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);