You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by ja...@apache.org on 2018/08/15 15:11:08 UTC
[1/3] incubator-tephra git commit: TEPHRA-294 Update
TransactionAwareTable to implement interface of Table for 1.3 and 1.4
Repository: incubator-tephra
Updated Branches:
refs/heads/master d0a1c4c29 -> 16bcfefbc
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
index 560b0fe..179b22e 100644
--- a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
+++ b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.tephra.TxConstants;
import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
import org.junit.AfterClass;
@@ -60,7 +60,6 @@ public abstract class AbstractHBaseTableTest {
conf.setInt("hbase.master.info.port", 0);
conf.setInt("hbase.regionserver.port", 0);
conf.setInt("hbase.regionserver.info.port", 0);
-
testUtil.startMiniCluster();
hBaseAdmin = testUtil.getHBaseAdmin();
}
@@ -76,12 +75,12 @@ public abstract class AbstractHBaseTableTest {
}
}
- protected static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+ protected static Table createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
return createTable(tableName, columnFamilies, false,
Collections.singletonList(TransactionProcessor.class.getName()));
}
- protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+ protected static Table createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
@@ -101,6 +100,6 @@ public abstract class AbstractHBaseTableTest {
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
- return new HTable(testUtil.getConfiguration(), tableName);
+ return testUtil.getConnection().getTable(TableName.valueOf(tableName));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 73f9d45..51fbbda 100644
--- a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -26,17 +26,17 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -99,7 +99,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
static TransactionManager txManager;
private TransactionContext transactionContext;
private TransactionAwareHTable transactionAwareHTable;
- private HTable hTable;
+ private Table hTable;
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -254,7 +254,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
*/
@Test
public void testValidTransactionalDelete() throws Exception {
- try (HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
+ try (Table hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
new byte[][]{TestBytes.family, TestBytes.family2})) {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -394,7 +394,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
*/
@Test
public void testAttributesPreserved() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
+ Table hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
new byte[][]{TestBytes.family, TestBytes.family2}, false,
Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
try {
@@ -436,7 +436,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testFamilyDeleteWithCompaction() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+ Table hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
new byte[][]{TestBytes.family, TestBytes.family2});
try {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
@@ -536,7 +536,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
- HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
+ Table hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
@@ -572,7 +572,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
+ Table hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
@@ -604,7 +604,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testRowDelete() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
+ Table hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -687,7 +687,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
txContext.finish();
-
+
txContext.start();
txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
txContext.finish();
@@ -781,12 +781,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testReadYourWrites() throws Exception {
// In-progress tx1: started before our main transaction
- HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+ Table hTable1 = testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table));
TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
// In-progress tx2: started while our main transaction is running
- HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+ Table hTable2 = testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table));
TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
@@ -838,11 +838,13 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testRowLevelConflictDetection() throws Exception {
- TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable1 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.ROW);
TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.ROW);
TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
@@ -949,11 +951,13 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testNoneLevelConflictDetection() throws Exception {
InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
- TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable1 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.NONE);
TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.NONE);
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
@@ -1088,7 +1092,8 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
// check that writes are still not visible to other clients
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)));
TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
txContext2.start();
@@ -1147,7 +1152,8 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
// check that writes are not visible
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)));
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
txContext2.start();
Transaction newTx = txContext2.getCurrentTransaction();
@@ -1183,7 +1189,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
// Add some pre-existing, non-transactional data
- HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
+ Table nonTxTable = testUtil.getConnection().getTable(txTable.getName());
nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
@@ -1191,7 +1197,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
HConstants.EMPTY_BYTE_ARRAY));
nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
- nonTxTable.flushCommits();
// Add transactional data
txContext.start();
@@ -1282,15 +1287,15 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
txContext.finish();
}
- private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
+ private void verifyRow(Table table, byte[] rowkey, byte[] expectedValue) throws Exception {
verifyRow(table, new Get(rowkey), expectedValue);
}
- private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
+ private void verifyRow(Table table, Get get, byte[] expectedValue) throws Exception {
verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
}
- private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
+ private void verifyRows(Table table, Get get, List<byte[]> expectedValues) throws Exception {
Result result = table.get(get);
if (expectedValues == null) {
assertTrue(result.isEmpty());
@@ -1310,12 +1315,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
}
}
- private Cell[] getRow(HTableInterface table, Get get) throws Exception {
+ private Cell[] getRow(Table table, Get get) throws Exception {
Result result = table.get(get);
return result.rawCells();
}
- private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
+ private void verifyScan(Table table, Scan scan, List<KeyValue> expectedCells) throws Exception {
List<Cell> actualCells = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
Result[] results = scanner.next(expectedCells.size() + 1);
@@ -1328,7 +1333,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testVisibilityAll() throws Exception {
- HTable nonTxTable =
+ Table nonTxTable =
createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
true, Collections.singletonList(TransactionProcessor.class.getName()));
TransactionAwareHTable txTable =
@@ -1500,7 +1505,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
// to prevent Tephra from replacing delete with delete marker
deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
nonTxTable.delete(deleteFamily);
- nonTxTable.flushCommits();
txContext.start();
txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
@@ -1712,7 +1716,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
* Represents older transaction clients
*/
private static class OldTransactionAwareHTable extends TransactionAwareHTable {
- public OldTransactionAwareHTable(HTableInterface hTable) {
+ public OldTransactionAwareHTable(Table hTable) {
super(hTable);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 2e9dc17..031b0f3 100644
--- a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -23,7 +23,6 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TxConstants;
@@ -53,7 +52,7 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest {
public void beforeTest() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
- HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 55348b0..e92b4cf 100644
--- a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -90,7 +89,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Do some transactional data operations
txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
- HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
+ Table hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -129,7 +128,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
private void createPruneStateTable() throws Exception {
- HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
@@ -311,7 +310,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Create an empty table
TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
- HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+ Table emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
index 1476906..aa669e5 100644
--- a/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
+++ b/tephra-hbase-compat-1.4/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
@@ -30,7 +30,6 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TxConstants;
@@ -104,7 +103,7 @@ public class InvalidListPruningDebugTest extends AbstractHBaseTableTest {
public static void addData() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
- HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
[3/3] incubator-tephra git commit: TEPHRA-291 Notice file is outdated
Posted by ja...@apache.org.
TEPHRA-291 Notice file is outdated
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/16bcfefb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/16bcfefb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/16bcfefb
Branch: refs/heads/master
Commit: 16bcfefbc2f2de5fae233db41cc2c1072cee437c
Parents: 52fd15f
Author: James Taylor <ja...@apache.org>
Authored: Sun Aug 12 20:58:56 2018 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Sun Aug 12 20:58:56 2018 -0700
----------------------------------------------------------------------
NOTICE | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/16bcfefb/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index bb171fd..8f7b045 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache Tephra
-Copyright 2016 The Apache Software Foundation
+Copyright 2016-2018 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
[2/3] incubator-tephra git commit: TEPHRA-294 Update
TransactionAwareTable to implement interface of Table for 1.3 and 1.4
Posted by ja...@apache.org.
TEPHRA-294 Update TransactionAwareTable to implement interface of Table for 1.3 and 1.4
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/52fd15f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/52fd15f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/52fd15f0
Branch: refs/heads/master
Commit: 52fd15f0ab45529f8b0ba808e5de2a909f6638a4
Parents: d0a1c4c
Author: James Taylor <ja...@apache.org>
Authored: Sun Aug 12 20:47:10 2018 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Sun Aug 12 20:54:02 2018 -0700
----------------------------------------------------------------------
tephra-examples/hbase-1.3/pom.xml | 4 +-
.../apache/tephra/examples/BalanceBooks.java | 341 +++++++++++++++++++
.../apache/tephra/examples/package-info.java | 40 +++
.../tephra/examples/BalanceBooksTest.java | 137 ++++++++
tephra-examples/hbase-1.4/pom.xml | 4 +-
.../apache/tephra/examples/BalanceBooks.java | 341 +++++++++++++++++++
.../apache/tephra/examples/package-info.java | 40 +++
.../tephra/examples/BalanceBooksTest.java | 137 ++++++++
.../tephra/hbase/TransactionAwareHTable.java | 92 +----
.../tephra/hbase/AbstractHBaseTableTest.java | 9 +-
.../hbase/TransactionAwareHTableTest.java | 60 ++--
.../hbase/txprune/DataJanitorStateTest.java | 3 +-
.../hbase/txprune/InvalidListPruneTest.java | 7 +-
.../txprune/InvalidListPruningDebugTest.java | 3 +-
.../tephra/hbase/TransactionAwareHTable.java | 135 ++------
.../tephra/hbase/AbstractHBaseTableTest.java | 9 +-
.../hbase/TransactionAwareHTableTest.java | 60 ++--
.../hbase/txprune/DataJanitorStateTest.java | 3 +-
.../hbase/txprune/InvalidListPruneTest.java | 7 +-
.../txprune/InvalidListPruningDebugTest.java | 3 +-
20 files changed, 1171 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.3/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/pom.xml b/tephra-examples/hbase-1.3/pom.xml
index de1c84a..865541a 100644
--- a/tephra-examples/hbase-1.3/pom.xml
+++ b/tephra-examples/hbase-1.3/pom.xml
@@ -34,8 +34,8 @@
</properties>
<build>
- <sourceDirectory>../src/main/java</sourceDirectory>
- <testSourceDirectory>../src/test/java</testSourceDirectory>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
</build>
<dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/BalanceBooks.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/BalanceBooks.java b/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/BalanceBooks.java
new file mode 100644
index 0000000..b970598
--- /dev/null
+++ b/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/BalanceBooks.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.common.io.Closeables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Simple example application that launches a number of concurrent clients, one per "account". Each client attempts to
+ * make withdrawals from other clients, and deposit the same amount to its own account in a single transaction.
+ * Since this means the client will be updating both its own row and the withdrawee's row, this will naturally lead to
+ * transaction conflicts. All clients will run for a specified number of iterations. When the processing is complete,
+ * the total sum of all rows should be zero, if transactional integrity was maintained.
+ *
+ * <p>
+ * You can run the BalanceBooks application with the following command:
+ * <pre>
+ * ./bin/tephra run org.apache.tephra.examples.BalanceBooks [num clients] [num iterations]
+ * </pre>
+ * where <code>[num clients]</code> is the number of concurrent client threads to use, and
+ * <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread.
+ * </p>
+ */
+public class BalanceBooks implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(BalanceBooks.class);
+
+ private static final int MAX_AMOUNT = 100;
+ private static final byte[] TABLE = Bytes.toBytes("testbalances");
+ private static final byte[] FAMILY = Bytes.toBytes("f");
+ private static final byte[] COL = Bytes.toBytes("b");
+
+ private final int totalClients;
+ private final int iterations;
+
+ private Configuration conf;
+ private ZKClientService zkClient;
+ private TransactionServiceClient txClient;
+ private Connection conn;
+
+ public BalanceBooks(int totalClients, int iterations) {
+ this(totalClients, iterations, new ConfigurationFactory().get());
+ }
+
+ public BalanceBooks(int totalClients, int iterations, Configuration conf) {
+ this.totalClients = totalClients;
+ this.iterations = iterations;
+ this.conf = conf;
+ }
+
+ /**
+ * Sets up common resources required by all clients.
+ */
+ public void init() throws IOException {
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ new TransactionModules().getDistributedModules(),
+ new TransactionClientModule()
+ );
+
+ zkClient = injector.getInstance(ZKClientService.class);
+ zkClient.startAndWait();
+ txClient = injector.getInstance(TransactionServiceClient.class);
+ conn = ConnectionFactory.createConnection(conf);
+ createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
+ }
+
+ /**
+ * Runs all clients and waits for them to complete.
+ */
+ public void run() throws IOException, InterruptedException {
+ List<Client> clients = new ArrayList<>(totalClients);
+ for (int i = 0; i < totalClients; i++) {
+ Client c = new Client(i, totalClients, iterations);
+ c.init(txClient, conn.getTable(TableName.valueOf(TABLE)));
+ c.start();
+ clients.add(c);
+ }
+
+ for (Client c : clients) {
+ c.join();
+ Closeables.closeQuietly(c);
+ }
+ }
+
+ /**
+ * Validates the current state of the data stored at the end of the test. Each update by a client consists of two
+ * parts: a withdrawal of a random amount from a randomly select other account, and a corresponding to deposit to
+ * the client's own account. So, if all the updates were performed consistently (no partial updates or partial
+ * rollbacks), then the total sum of all balances at the end should be 0.
+ */
+ public boolean verify() {
+ boolean success = false;
+ try {
+ TransactionAwareHTable table = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TABLE)));
+ TransactionContext context = new TransactionContext(txClient, table);
+
+ LOG.info("VERIFYING BALANCES");
+ context.start();
+ long totalBalance = 0;
+
+ try (ResultScanner scanner = table.getScanner(new Scan())) {
+ for (Result r : scanner) {
+ if (!r.isEmpty()) {
+ int rowId = Bytes.toInt(r.getRow());
+ long balance = Bytes.toLong(r.getValue(FAMILY, COL));
+ totalBalance += balance;
+ LOG.info("Client #{}: balance = ${}", rowId, balance);
+ }
+ }
+ }
+ if (totalBalance == 0) {
+ LOG.info("PASSED!");
+ success = true;
+ } else {
+ LOG.info("FAILED! Total balance should be 0 but was {}", totalBalance);
+ }
+ context.finish();
+ } catch (Exception e) {
+ LOG.error("Failed verification check", e);
+ }
+ return success;
+ }
+
+ /**
+ * Frees up the underlying resources common to all clients.
+ */
+ public void close() {
+ try {
+ if (conn != null) {
+ conn.close();
+ }
+ } catch (IOException ignored) { }
+
+ if (zkClient != null) {
+ zkClient.stopAndWait();
+ }
+ }
+
+ protected void createTableIfNotExists(Configuration conf, byte[] tableName, byte[][] columnFamilies)
+ throws IOException {
+ try (Admin admin = this.conn.getAdmin()) {
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ for (byte[] family : columnFamilies) {
+ HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+ columnDesc.setMaxVersions(Integer.MAX_VALUE);
+ desc.addFamily(columnDesc);
+ }
+ desc.addCoprocessor(TransactionProcessor.class.getName());
+ admin.createTable(desc);
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 2) {
+ System.err.println("Usage: java " + BalanceBooks.class.getName() + " <num clients> <iterations>");
+ System.err.println("\twhere <num clients> >= 2");
+ System.exit(1);
+ }
+
+ try (BalanceBooks bb = new BalanceBooks(Integer.parseInt(args[0]), Integer.parseInt(args[1]))) {
+ bb.init();
+ bb.run();
+ bb.verify();
+ } catch (Exception e) {
+ LOG.error("Failed during BalanceBooks run", e);
+ }
+ }
+
+ /**
+ * Represents a single client actor in the test. Each client runs as a separate thread.
+ *
+ * For the given number of iterations, the client will:
+ * <ol>
+ * <li>select a random other client from which to withdraw</li>
+ * <li>select a random amount from 0 to MAX_AMOUNT</li>
+ * <li>start a new transaction and: deduct the amount from the other client's acccount, and deposit
+ * the same amount to its own account.</li>
+ * </ol>
+ *
+ * Since multiple clients operate concurrently and contend over a set of constrained resources
+ * (the client accounts), it is expected that a portion of the attempted transactions will encounter
+ * conflicts, due to a simultaneous deduction from or deposit to one the same accounts which has successfully
+ * committed first. In this case, the updates from the transaction encountering the conflict should be completely
+ * rolled back, leaving the data in a consistent state.
+ */
+ private static class Client extends Thread implements Closeable {
+ private final int id;
+ private final int totalClients;
+ private final int iterations;
+
+ private final Random random = new Random();
+
+ private TransactionContext txContext;
+ private TransactionAwareHTable txTable;
+
+
+ public Client(int id, int totalClients, int iterations) {
+ this.id = id;
+ this.totalClients = totalClients;
+ this.iterations = iterations;
+ }
+
+ /**
+ * Sets up any resources needed by the individual client.
+ *
+ * @param txClient the transaction client to use in accessing the transaciton service
+ * @param table the HBase table instance to use for accessing storage
+ */
+ public void init(TransactionSystemClient txClient, Table table) {
+ txTable = new TransactionAwareHTable(table);
+ txContext = new TransactionContext(txClient, txTable);
+ }
+
+ public void run() {
+ try {
+ for (int i = 0; i < iterations; i++) {
+ runOnce();
+ }
+ } catch (TransactionFailureException e) {
+ LOG.error("Client #{}: Failed on exception", id, e);
+ }
+ }
+
+ /**
+ * Runs a single iteration of the client logic.
+ */
+ private void runOnce() throws TransactionFailureException {
+ int withdrawee = getNextWithdrawee();
+ int amount = getAmount();
+
+ try {
+ txContext.start();
+ long withdraweeBalance = getCurrentBalance(withdrawee);
+ long ownBalance = getCurrentBalance(id);
+ long withdraweeNew = withdraweeBalance - amount;
+ long ownNew = ownBalance + amount;
+
+ setBalance(withdrawee, withdraweeNew);
+ setBalance(id, ownNew);
+ LOG.debug("Client #{}: Withdrew ${} from #{}; withdrawee old={}, new={}; own old={}, new={}",
+ id, amount, withdrawee, withdraweeBalance, withdraweeNew, ownBalance, ownNew);
+ txContext.finish();
+
+ } catch (IOException ioe) {
+ LOG.error("Client #{}: Unhandled client failure", id, ioe);
+ txContext.abort();
+ } catch (TransactionConflictException tce) {
+ LOG.debug("CONFLICT: client #{} attempting to withdraw from #{}", id, withdrawee);
+ txContext.abort(tce);
+ } catch (TransactionFailureException tfe) {
+ LOG.error("Client #{}: Unhandled transaction failure", id, tfe);
+ txContext.abort(tfe);
+ }
+ }
+
+ private long getCurrentBalance(int id) throws IOException {
+ Result r = txTable.get(new Get(Bytes.toBytes(id)));
+ byte[] balanceBytes = r.getValue(FAMILY, COL);
+ if (balanceBytes == null) {
+ return 0;
+ }
+ return Bytes.toLong(balanceBytes);
+ }
+
+ private void setBalance(int id, long balance) throws IOException {
+ txTable.put(new Put(Bytes.toBytes(id)).addColumn(FAMILY, COL, Bytes.toBytes(balance)));
+ }
+
+ private int getNextWithdrawee() {
+ int next;
+ do {
+ next = random.nextInt(totalClients);
+ } while (next == id);
+ return next;
+ }
+
+ private int getAmount() {
+ return random.nextInt(MAX_AMOUNT);
+ }
+
+ public void close() throws IOException {
+ txTable.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/package-info.java b/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/package-info.java
new file mode 100644
index 0000000..a0e67d5
--- /dev/null
+++ b/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/package-info.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains example applications for Tephra designed to illustrate sample Tephra usage
+ * and provide out-of-the-box sample applications which can be run to test cluster functionality.
+ *
+ * <p>Currently the following applications are provided:
+ *
+ * <ul>
+ * <li><strong>BalanceBooks</strong> - this application runs a specified number of concurrent clients in separate
+ * threads, which perform transactions to make withdrawals from each other's accounts and deposits to their own
+ * accounts. At the end of the test, the total value of all account balances is verified to be equal to zero,
+ * which confirms that transactional integrity was not violated.
+ * </li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * Note that, for simplicity, the examples package is currently hardcoded to compile against a specific HBase
+ * version (currently 2.0). In the future, we should provide Maven profiles to allow compiling the examples
+ * against each of the supported HBase versions.
+ * </p>
+ */
+package org.apache.tephra.examples;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.3/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java b/tephra-examples/hbase-1.3/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
new file mode 100644
index 0000000..4dfe107
--- /dev/null
+++ b/tephra-examples/hbase-1.3/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.Tests;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link BalanceBooks} program.
+ */
+public class BalanceBooksTest {
+ private static final Logger LOG = LoggerFactory.getLogger(BalanceBooksTest.class);
+ private static HBaseTestingUtility testUtil;
+ private static TransactionService txService;
+ private static ZKClientService zkClientService;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ testUtil = new HBaseTestingUtility();
+ Configuration conf = testUtil.getConfiguration();
+ conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+
+ // Tune down the connection thread pool size
+ conf.setInt("hbase.hconnection.threads.core", 5);
+ conf.setInt("hbase.hconnection.threads.max", 10);
+ // Tunn down handler threads in regionserver
+ conf.setInt("hbase.regionserver.handler.count", 10);
+
+ // Set to random port
+ conf.setInt("hbase.master.port", 0);
+ conf.setInt("hbase.master.info.port", 0);
+ conf.setInt("hbase.regionserver.port", 0);
+ conf.setInt("hbase.regionserver.info.port", 0);
+
+ testUtil.startMiniCluster();
+
+ String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode
+ String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
+ LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
+ conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
+
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ Modules.override(new TransactionModules().getDistributedModules())
+ .with(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+ }
+ }),
+ new TransactionClientModule()
+ );
+
+ zkClientService = injector.getInstance(ZKClientService.class);
+ zkClientService.startAndWait();
+
+ // start a tx server
+ txService = injector.getInstance(TransactionService.class);
+ try {
+ LOG.info("Starting transaction service");
+ txService.startAndWait();
+ } catch (Exception e) {
+ LOG.error("Failed to start service: ", e);
+ throw e;
+ }
+
+ Tests.waitForTxReady(injector.getInstance(TransactionSystemClient.class));
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (txService != null) {
+ txService.stopAndWait();
+ }
+ if (zkClientService != null) {
+ zkClientService.stopAndWait();
+ }
+ testUtil.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testBalanceBooks() throws Exception {
+ try (BalanceBooks bb = new BalanceBooks(5, 100, testUtil.getConfiguration())) {
+ bb.init();
+ bb.run();
+ assertTrue(bb.verify());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.4/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.4/pom.xml b/tephra-examples/hbase-1.4/pom.xml
index 3286c5c..3de31b4 100644
--- a/tephra-examples/hbase-1.4/pom.xml
+++ b/tephra-examples/hbase-1.4/pom.xml
@@ -34,8 +34,8 @@
</properties>
<build>
- <sourceDirectory>../src/main/java</sourceDirectory>
- <testSourceDirectory>../src/test/java</testSourceDirectory>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
</build>
<dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/BalanceBooks.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/BalanceBooks.java b/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/BalanceBooks.java
new file mode 100644
index 0000000..b970598
--- /dev/null
+++ b/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/BalanceBooks.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.common.io.Closeables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Simple example application that launches a number of concurrent clients, one per "account". Each client attempts to
+ * make withdrawals from other clients, and deposit the same amount to its own account in a single transaction.
+ * Since this means the client will be updating both its own row and the withdrawee's row, this will naturally lead to
+ * transaction conflicts. All clients will run for a specified number of iterations. When the processing is complete,
+ * the total sum of all rows should be zero, if transactional integrity was maintained.
+ *
+ * <p>
+ * You can run the BalanceBooks application with the following command:
+ * <pre>
+ * ./bin/tephra run org.apache.tephra.examples.BalanceBooks [num clients] [num iterations]
+ * </pre>
+ * where <code>[num clients]</code> is the number of concurrent client threads to use, and
+ * <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread.
+ * </p>
+ */
+public class BalanceBooks implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(BalanceBooks.class);
+
+ private static final int MAX_AMOUNT = 100;
+ private static final byte[] TABLE = Bytes.toBytes("testbalances");
+ private static final byte[] FAMILY = Bytes.toBytes("f");
+ private static final byte[] COL = Bytes.toBytes("b");
+
+ private final int totalClients;
+ private final int iterations;
+
+ private Configuration conf;
+ private ZKClientService zkClient;
+ private TransactionServiceClient txClient;
+ private Connection conn;
+
+ public BalanceBooks(int totalClients, int iterations) {
+ this(totalClients, iterations, new ConfigurationFactory().get());
+ }
+
+ public BalanceBooks(int totalClients, int iterations, Configuration conf) {
+ this.totalClients = totalClients;
+ this.iterations = iterations;
+ this.conf = conf;
+ }
+
+ /**
+ * Sets up common resources required by all clients.
+ */
+ public void init() throws IOException {
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ new TransactionModules().getDistributedModules(),
+ new TransactionClientModule()
+ );
+
+ zkClient = injector.getInstance(ZKClientService.class);
+ zkClient.startAndWait();
+ txClient = injector.getInstance(TransactionServiceClient.class);
+ conn = ConnectionFactory.createConnection(conf);
+ createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
+ }
+
+ /**
+ * Runs all clients and waits for them to complete.
+ */
+ public void run() throws IOException, InterruptedException {
+ List<Client> clients = new ArrayList<>(totalClients);
+ for (int i = 0; i < totalClients; i++) {
+ Client c = new Client(i, totalClients, iterations);
+ c.init(txClient, conn.getTable(TableName.valueOf(TABLE)));
+ c.start();
+ clients.add(c);
+ }
+
+ for (Client c : clients) {
+ c.join();
+ Closeables.closeQuietly(c);
+ }
+ }
+
+ /**
+ * Validates the current state of the data stored at the end of the test. Each update by a client consists of two
+ * parts: a withdrawal of a random amount from a randomly select other account, and a corresponding to deposit to
+ * the client's own account. So, if all the updates were performed consistently (no partial updates or partial
+ * rollbacks), then the total sum of all balances at the end should be 0.
+ */
+ public boolean verify() {
+ boolean success = false;
+ try {
+ TransactionAwareHTable table = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TABLE)));
+ TransactionContext context = new TransactionContext(txClient, table);
+
+ LOG.info("VERIFYING BALANCES");
+ context.start();
+ long totalBalance = 0;
+
+ try (ResultScanner scanner = table.getScanner(new Scan())) {
+ for (Result r : scanner) {
+ if (!r.isEmpty()) {
+ int rowId = Bytes.toInt(r.getRow());
+ long balance = Bytes.toLong(r.getValue(FAMILY, COL));
+ totalBalance += balance;
+ LOG.info("Client #{}: balance = ${}", rowId, balance);
+ }
+ }
+ }
+ if (totalBalance == 0) {
+ LOG.info("PASSED!");
+ success = true;
+ } else {
+ LOG.info("FAILED! Total balance should be 0 but was {}", totalBalance);
+ }
+ context.finish();
+ } catch (Exception e) {
+ LOG.error("Failed verification check", e);
+ }
+ return success;
+ }
+
+ /**
+ * Frees up the underlying resources common to all clients.
+ */
+ public void close() {
+ try {
+ if (conn != null) {
+ conn.close();
+ }
+ } catch (IOException ignored) { }
+
+ if (zkClient != null) {
+ zkClient.stopAndWait();
+ }
+ }
+
+ protected void createTableIfNotExists(Configuration conf, byte[] tableName, byte[][] columnFamilies)
+ throws IOException {
+ try (Admin admin = this.conn.getAdmin()) {
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ for (byte[] family : columnFamilies) {
+ HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+ columnDesc.setMaxVersions(Integer.MAX_VALUE);
+ desc.addFamily(columnDesc);
+ }
+ desc.addCoprocessor(TransactionProcessor.class.getName());
+ admin.createTable(desc);
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 2) {
+ System.err.println("Usage: java " + BalanceBooks.class.getName() + " <num clients> <iterations>");
+ System.err.println("\twhere <num clients> >= 2");
+ System.exit(1);
+ }
+
+ try (BalanceBooks bb = new BalanceBooks(Integer.parseInt(args[0]), Integer.parseInt(args[1]))) {
+ bb.init();
+ bb.run();
+ bb.verify();
+ } catch (Exception e) {
+ LOG.error("Failed during BalanceBooks run", e);
+ }
+ }
+
+ /**
+ * Represents a single client actor in the test. Each client runs as a separate thread.
+ *
+ * For the given number of iterations, the client will:
+ * <ol>
+ * <li>select a random other client from which to withdraw</li>
+ * <li>select a random amount from 0 to MAX_AMOUNT</li>
+ * <li>start a new transaction and: deduct the amount from the other client's acccount, and deposit
+ * the same amount to its own account.</li>
+ * </ol>
+ *
+ * Since multiple clients operate concurrently and contend over a set of constrained resources
+ * (the client accounts), it is expected that a portion of the attempted transactions will encounter
+ * conflicts, due to a simultaneous deduction from or deposit to one the same accounts which has successfully
+ * committed first. In this case, the updates from the transaction encountering the conflict should be completely
+ * rolled back, leaving the data in a consistent state.
+ */
+ private static class Client extends Thread implements Closeable {
+ private final int id;
+ private final int totalClients;
+ private final int iterations;
+
+ private final Random random = new Random();
+
+ private TransactionContext txContext;
+ private TransactionAwareHTable txTable;
+
+
+ public Client(int id, int totalClients, int iterations) {
+ this.id = id;
+ this.totalClients = totalClients;
+ this.iterations = iterations;
+ }
+
+ /**
+ * Sets up any resources needed by the individual client.
+ *
+ * @param txClient the transaction client to use in accessing the transaciton service
+ * @param table the HBase table instance to use for accessing storage
+ */
+ public void init(TransactionSystemClient txClient, Table table) {
+ txTable = new TransactionAwareHTable(table);
+ txContext = new TransactionContext(txClient, txTable);
+ }
+
+ public void run() {
+ try {
+ for (int i = 0; i < iterations; i++) {
+ runOnce();
+ }
+ } catch (TransactionFailureException e) {
+ LOG.error("Client #{}: Failed on exception", id, e);
+ }
+ }
+
+ /**
+ * Runs a single iteration of the client logic.
+ */
+ private void runOnce() throws TransactionFailureException {
+ int withdrawee = getNextWithdrawee();
+ int amount = getAmount();
+
+ try {
+ txContext.start();
+ long withdraweeBalance = getCurrentBalance(withdrawee);
+ long ownBalance = getCurrentBalance(id);
+ long withdraweeNew = withdraweeBalance - amount;
+ long ownNew = ownBalance + amount;
+
+ setBalance(withdrawee, withdraweeNew);
+ setBalance(id, ownNew);
+ LOG.debug("Client #{}: Withdrew ${} from #{}; withdrawee old={}, new={}; own old={}, new={}",
+ id, amount, withdrawee, withdraweeBalance, withdraweeNew, ownBalance, ownNew);
+ txContext.finish();
+
+ } catch (IOException ioe) {
+ LOG.error("Client #{}: Unhandled client failure", id, ioe);
+ txContext.abort();
+ } catch (TransactionConflictException tce) {
+ LOG.debug("CONFLICT: client #{} attempting to withdraw from #{}", id, withdrawee);
+ txContext.abort(tce);
+ } catch (TransactionFailureException tfe) {
+ LOG.error("Client #{}: Unhandled transaction failure", id, tfe);
+ txContext.abort(tfe);
+ }
+ }
+
+ private long getCurrentBalance(int id) throws IOException {
+ Result r = txTable.get(new Get(Bytes.toBytes(id)));
+ byte[] balanceBytes = r.getValue(FAMILY, COL);
+ if (balanceBytes == null) {
+ return 0;
+ }
+ return Bytes.toLong(balanceBytes);
+ }
+
+ private void setBalance(int id, long balance) throws IOException {
+ txTable.put(new Put(Bytes.toBytes(id)).addColumn(FAMILY, COL, Bytes.toBytes(balance)));
+ }
+
+ private int getNextWithdrawee() {
+ int next;
+ do {
+ next = random.nextInt(totalClients);
+ } while (next == id);
+ return next;
+ }
+
+ private int getAmount() {
+ return random.nextInt(MAX_AMOUNT);
+ }
+
+ public void close() throws IOException {
+ txTable.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/package-info.java b/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/package-info.java
new file mode 100644
index 0000000..a0e67d5
--- /dev/null
+++ b/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/package-info.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains example applications for Tephra designed to illustrate sample Tephra usage
+ * and provide out-of-the-box sample applications which can be run to test cluster functionality.
+ *
+ * <p>Currently the following applications are provided:
+ *
+ * <ul>
+ * <li><strong>BalanceBooks</strong> - this application runs a specified number of concurrent clients in separate
+ * threads, which perform transactions to make withdrawals from each other's accounts and deposits to their own
+ * accounts. At the end of the test, the total value of all account balances is verified to be equal to zero,
+ * which confirms that transactional integrity was not violated.
+ * </li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * Note that, for simplicity, the examples package is currently hardcoded to compile against a specific HBase
+ * version (currently 2.0). In the future, we should provide Maven profiles to allow compiling the examples
+ * against each of the supported HBase versions.
+ * </p>
+ */
+package org.apache.tephra.examples;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.4/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.4/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java b/tephra-examples/hbase-1.4/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
new file mode 100644
index 0000000..4dfe107
--- /dev/null
+++ b/tephra-examples/hbase-1.4/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.Tests;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link BalanceBooks} program.
+ */
+public class BalanceBooksTest {
+ private static final Logger LOG = LoggerFactory.getLogger(BalanceBooksTest.class);
+ private static HBaseTestingUtility testUtil;
+ private static TransactionService txService;
+ private static ZKClientService zkClientService;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ testUtil = new HBaseTestingUtility();
+ Configuration conf = testUtil.getConfiguration();
+ conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+
+ // Tune down the connection thread pool size
+ conf.setInt("hbase.hconnection.threads.core", 5);
+ conf.setInt("hbase.hconnection.threads.max", 10);
+ // Tunn down handler threads in regionserver
+ conf.setInt("hbase.regionserver.handler.count", 10);
+
+ // Set to random port
+ conf.setInt("hbase.master.port", 0);
+ conf.setInt("hbase.master.info.port", 0);
+ conf.setInt("hbase.regionserver.port", 0);
+ conf.setInt("hbase.regionserver.info.port", 0);
+
+ testUtil.startMiniCluster();
+
+ String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode
+ String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
+ LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
+ conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
+
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ Modules.override(new TransactionModules().getDistributedModules())
+ .with(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+ }
+ }),
+ new TransactionClientModule()
+ );
+
+ zkClientService = injector.getInstance(ZKClientService.class);
+ zkClientService.startAndWait();
+
+ // start a tx server
+ txService = injector.getInstance(TransactionService.class);
+ try {
+ LOG.info("Starting transaction service");
+ txService.startAndWait();
+ } catch (Exception e) {
+ LOG.error("Failed to start service: ", e);
+ throw e;
+ }
+
+ Tests.waitForTxReady(injector.getInstance(TransactionSystemClient.class));
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (txService != null) {
+ txService.stopAndWait();
+ }
+ if (zkClientService != null) {
+ zkClientService.stopAndWait();
+ }
+ testUtil.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testBalanceBooks() throws Exception {
+ try (BalanceBooks bb = new BalanceBooks(5, 100, testUtil.getConfiguration())) {
+ bb.init();
+ bb.run();
+ assertTrue(bb.verify());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index af4b350..18886c7 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -65,17 +65,17 @@ import java.util.Set;
* was started.
*/
public class TransactionAwareHTable extends AbstractTransactionAwareTable
- implements HTableInterface, TransactionAware {
+ implements Table, TransactionAware {
private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
- private final HTableInterface hTable;
+ private final Table hTable;
/**
* Create a transactional aware instance of the passed HTable
*
* @param hTable underlying HBase table to use
*/
- public TransactionAwareHTable(HTableInterface hTable) {
+ public TransactionAwareHTable(Table hTable) {
this(hTable, false);
}
@@ -85,7 +85,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param hTable underlying HBase table to use
* @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
*/
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
+ public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel) {
this(hTable, conflictLevel, false);
}
@@ -96,7 +96,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
* will be available, though non-transactional
*/
- public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
+ public TransactionAwareHTable(Table hTable, boolean allowNonTransactional) {
this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
}
@@ -108,7 +108,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
* will be available, though non-transactional
*/
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
+ public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel,
boolean allowNonTransactional) {
super(conflictLevel, allowNonTransactional,
hTable.getConfiguration().getBoolean(
@@ -121,12 +121,11 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
@Override
protected byte[] getTableKey() {
- return getTableName();
+ return hTable.getName().getName();
}
@Override
protected boolean doCommit() throws IOException {
- hTable.flushCommits();
return true;
}
@@ -169,22 +168,12 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
hTable.delete(rollbackDeletes);
return true;
} finally {
- try {
- hTable.flushCommits();
- } catch (Exception e) {
- LOG.error("Could not flush HTable commits", e);
- }
tx = null;
changeSets.clear();
}
}
- /* HTableInterface implementation */
-
- @Override
- public byte[] getTableName() {
- return hTable.getTableName();
- }
+ /* Table implementation */
@Override
public TableName getName() {
@@ -210,18 +199,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public Boolean[] exists(List<Get> gets) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Get> transactionalizedGets = new ArrayList<>(gets.size());
- for (Get get : gets) {
- transactionalizedGets.add(transactionalizeAction(get));
- }
- return hTable.exists(transactionalizedGets);
- }
-
- @Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
@@ -276,15 +253,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- if (allowNonTransactional) {
- return hTable.getRowOrBefore(row, family);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
public ResultScanner getScanner(Scan scan) throws IOException {
if (tx == null) {
throw new IOException("Transaction not started");
@@ -472,7 +440,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
+ throws IOException {
if (allowNonTransactional) {
return hTable.incrementColumnValue(row, family, qualifier, amount);
} else {
@@ -481,8 +450,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
- throws IOException {
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
+ Durability durability) throws IOException {
if (allowNonTransactional) {
return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
} else {
@@ -491,26 +460,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean isAutoFlush() {
- return hTable.isAutoFlush();
- }
-
- @Override
- public void flushCommits() throws IOException {
- hTable.flushCommits();
- }
-
- @Override
public void close() throws IOException {
hTable.close();
}
@@ -549,21 +498,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public void setAutoFlush(boolean autoFlush) {
- setAutoFlushTo(autoFlush);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- hTable.setAutoFlush(autoFlush, clearBufferOnFail);
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- hTable.setAutoFlushTo(autoFlush);
- }
-
- @Override
public long getWriteBufferSize() {
return hTable.getWriteBufferSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
index 560b0fe..179b22e 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.tephra.TxConstants;
import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
import org.junit.AfterClass;
@@ -60,7 +60,6 @@ public abstract class AbstractHBaseTableTest {
conf.setInt("hbase.master.info.port", 0);
conf.setInt("hbase.regionserver.port", 0);
conf.setInt("hbase.regionserver.info.port", 0);
-
testUtil.startMiniCluster();
hBaseAdmin = testUtil.getHBaseAdmin();
}
@@ -76,12 +75,12 @@ public abstract class AbstractHBaseTableTest {
}
}
- protected static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+ protected static Table createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
return createTable(tableName, columnFamilies, false,
Collections.singletonList(TransactionProcessor.class.getName()));
}
- protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+ protected static Table createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
List<String> coprocessors) throws Exception {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
@@ -101,6 +100,6 @@ public abstract class AbstractHBaseTableTest {
}
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
- return new HTable(testUtil.getConfiguration(), tableName);
+ return testUtil.getConnection().getTable(TableName.valueOf(tableName));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 73f9d45..51fbbda 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -26,17 +26,17 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -99,7 +99,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
static TransactionManager txManager;
private TransactionContext transactionContext;
private TransactionAwareHTable transactionAwareHTable;
- private HTable hTable;
+ private Table hTable;
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -254,7 +254,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
*/
@Test
public void testValidTransactionalDelete() throws Exception {
- try (HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
+ try (Table hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
new byte[][]{TestBytes.family, TestBytes.family2})) {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -394,7 +394,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
*/
@Test
public void testAttributesPreserved() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
+ Table hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
new byte[][]{TestBytes.family, TestBytes.family2}, false,
Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
try {
@@ -436,7 +436,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testFamilyDeleteWithCompaction() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+ Table hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
new byte[][]{TestBytes.family, TestBytes.family2});
try {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
@@ -536,7 +536,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
- HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
+ Table hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
@@ -572,7 +572,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
+ Table hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
@@ -604,7 +604,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testRowDelete() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
+ Table hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -687,7 +687,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
txContext.finish();
-
+
txContext.start();
txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
txContext.finish();
@@ -781,12 +781,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testReadYourWrites() throws Exception {
// In-progress tx1: started before our main transaction
- HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+ Table hTable1 = testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table));
TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
// In-progress tx2: started while our main transaction is running
- HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+ Table hTable2 = testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table));
TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
@@ -838,11 +838,13 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testRowLevelConflictDetection() throws Exception {
- TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable1 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.ROW);
TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.ROW);
TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
@@ -949,11 +951,13 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testNoneLevelConflictDetection() throws Exception {
InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
- TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable1 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.NONE);
TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.NONE);
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
@@ -1088,7 +1092,8 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
// check that writes are still not visible to other clients
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)));
TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
txContext2.start();
@@ -1147,7 +1152,8 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
// check that writes are not visible
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+ testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)));
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
txContext2.start();
Transaction newTx = txContext2.getCurrentTransaction();
@@ -1183,7 +1189,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
// Add some pre-existing, non-transactional data
- HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
+ Table nonTxTable = testUtil.getConnection().getTable(txTable.getName());
nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
@@ -1191,7 +1197,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
HConstants.EMPTY_BYTE_ARRAY));
nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
- nonTxTable.flushCommits();
// Add transactional data
txContext.start();
@@ -1282,15 +1287,15 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
txContext.finish();
}
- private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
+ private void verifyRow(Table table, byte[] rowkey, byte[] expectedValue) throws Exception {
verifyRow(table, new Get(rowkey), expectedValue);
}
- private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
+ private void verifyRow(Table table, Get get, byte[] expectedValue) throws Exception {
verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
}
- private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
+ private void verifyRows(Table table, Get get, List<byte[]> expectedValues) throws Exception {
Result result = table.get(get);
if (expectedValues == null) {
assertTrue(result.isEmpty());
@@ -1310,12 +1315,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
}
}
- private Cell[] getRow(HTableInterface table, Get get) throws Exception {
+ private Cell[] getRow(Table table, Get get) throws Exception {
Result result = table.get(get);
return result.rawCells();
}
- private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
+ private void verifyScan(Table table, Scan scan, List<KeyValue> expectedCells) throws Exception {
List<Cell> actualCells = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
Result[] results = scanner.next(expectedCells.size() + 1);
@@ -1328,7 +1333,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testVisibilityAll() throws Exception {
- HTable nonTxTable =
+ Table nonTxTable =
createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
true, Collections.singletonList(TransactionProcessor.class.getName()));
TransactionAwareHTable txTable =
@@ -1500,7 +1505,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
// to prevent Tephra from replacing delete with delete marker
deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
nonTxTable.delete(deleteFamily);
- nonTxTable.flushCommits();
txContext.start();
txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
@@ -1712,7 +1716,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
* Represents older transaction clients
*/
private static class OldTransactionAwareHTable extends TransactionAwareHTable {
- public OldTransactionAwareHTable(HTableInterface hTable) {
+ public OldTransactionAwareHTable(Table hTable) {
super(hTable);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 2e9dc17..031b0f3 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -23,7 +23,6 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TxConstants;
@@ -53,7 +52,7 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest {
public void beforeTest() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
- HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 55348b0..e92b4cf 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -90,7 +89,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Do some transactional data operations
txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
- HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
+ Table hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -129,7 +128,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
}
private void createPruneStateTable() throws Exception {
- HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
@@ -311,7 +310,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
// Create an empty table
TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
- HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+ Table emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
index 1476906..aa669e5 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
@@ -30,7 +30,6 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TxConstants;
@@ -104,7 +103,7 @@ public class InvalidListPruningDebugTest extends AbstractHBaseTableTest {
public static void addData() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
- HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index d6eb30e..e3ef374 100644
--- a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -58,6 +58,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* A Transaction Aware HTable implementation for HBase 1.3. Operations are committed as usual,
@@ -65,17 +66,17 @@ import java.util.Set;
* was started.
*/
public class TransactionAwareHTable extends AbstractTransactionAwareTable
- implements HTableInterface, TransactionAware {
+ implements Table, TransactionAware {
private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
- private final HTableInterface hTable;
+ private final Table hTable;
/**
* Create a transactional aware instance of the passed HTable
*
* @param hTable underlying HBase table to use
*/
- public TransactionAwareHTable(HTableInterface hTable) {
+ public TransactionAwareHTable(Table hTable) {
this(hTable, false);
}
@@ -85,7 +86,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param hTable underlying HBase table to use
* @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
*/
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
+ public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel) {
this(hTable, conflictLevel, false);
}
@@ -96,7 +97,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
* will be available, though non-transactional
*/
- public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
+ public TransactionAwareHTable(Table hTable, boolean allowNonTransactional) {
this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
}
@@ -108,7 +109,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
* will be available, though non-transactional
*/
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
+ public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel,
boolean allowNonTransactional) {
super(conflictLevel, allowNonTransactional,
hTable.getConfiguration().getBoolean(
@@ -121,12 +122,11 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
@Override
protected byte[] getTableKey() {
- return getTableName();
+ return hTable.getName().getName();
}
@Override
protected boolean doCommit() throws IOException {
- hTable.flushCommits();
return true;
}
@@ -169,22 +169,12 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
hTable.delete(rollbackDeletes);
return true;
} finally {
- try {
- hTable.flushCommits();
- } catch (Exception e) {
- LOG.error("Could not flush HTable commits", e);
- }
tx = null;
changeSets.clear();
}
}
- /* HTableInterface implementation */
-
- @Override
- public byte[] getTableName() {
- return hTable.getTableName();
- }
+ /* Table implementation */
@Override
public TableName getName() {
@@ -210,18 +200,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public Boolean[] exists(List<Get> gets) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Get> transactionalizedGets = new ArrayList<>(gets.size());
- for (Get get : gets) {
- transactionalizedGets.add(transactionalizeAction(get));
- }
- return hTable.exists(transactionalizedGets);
- }
-
- @Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
@@ -276,15 +254,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- if (allowNonTransactional) {
- return hTable.getRowOrBefore(row, family);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
public ResultScanner getScanner(Scan scan) throws IOException {
if (tx == null) {
throw new IOException("Transaction not started");
@@ -438,6 +407,28 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
+ public int getReadRpcTimeout() {
+ return hTable.getReadRpcTimeout();
+ }
+
+ @Override
+ public void setReadRpcTimeout(int readRpcTimeout) {
+ hTable.setReadRpcTimeout(readRpcTimeout);
+
+ }
+
+ @Override
+ public int getWriteRpcTimeout() {
+ return hTable.getWriteRpcTimeout();
+ }
+
+ @Override
+ public void setWriteRpcTimeout(int writeRpcTimeout) {
+ hTable.setWriteRpcTimeout(writeRpcTimeout);
+
+ }
+
+ @Override
public void mutateRow(RowMutations rm) throws IOException {
if (tx == null) {
throw new IOException("Transaction not started");
@@ -472,7 +463,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
+ throws IOException {
if (allowNonTransactional) {
return hTable.incrementColumnValue(row, family, qualifier, amount);
} else {
@@ -481,8 +473,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
- throws IOException {
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
+ Durability durability) throws IOException {
if (allowNonTransactional) {
return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
} else {
@@ -491,26 +483,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean isAutoFlush() {
- return hTable.isAutoFlush();
- }
-
- @Override
- public void flushCommits() throws IOException {
- hTable.flushCommits();
- }
-
- @Override
public void close() throws IOException {
hTable.close();
}
@@ -549,21 +521,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public void setAutoFlush(boolean autoFlush) {
- setAutoFlushTo(autoFlush);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- hTable.setAutoFlush(autoFlush, clearBufferOnFail);
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- hTable.setAutoFlushTo(autoFlush);
- }
-
- @Override
public long getWriteBufferSize() {
return hTable.getWriteBufferSize();
}
@@ -573,26 +530,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
hTable.setWriteBufferSize(writeBufferSize);
}
- @Override
- public int getReadRpcTimeout() {
- return hTable.getReadRpcTimeout();
- }
-
- @Override
- public void setReadRpcTimeout(int timeout) {
- hTable.setReadRpcTimeout(timeout);
- }
-
- @Override
- public int getWriteRpcTimeout() {
- return hTable.getWriteRpcTimeout();
- }
-
- @Override
- public void setWriteRpcTimeout(int timeout) {
- hTable.setWriteRpcTimeout(timeout);
- }
-
// Helpers to get copies of objects with the timestamp set to the current transaction timestamp.
private Get transactionalizeAction(Get get) throws IOException {