You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by oh...@apache.org on 2018/09/16 08:23:08 UTC
[4/4] incubator-omid git commit: OMID-107 Replace pre 1.0 deprecated
HBase APIs
OMID-107 Replace pre 1.0 deprecated HBase APIs
Signed-off-by: Ohad Shacham <oh...@yahoo-inc.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/75dc8177
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/75dc8177
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/75dc8177
Branch: refs/heads/phoenix-integration
Commit: 75dc817753c08626c376c582148e8af197fdcd63
Parents: 81672f0
Author: James Taylor <ja...@apache.org>
Authored: Sun Aug 26 08:49:50 2018 -0700
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Sun Sep 16 11:22:22 2018 +0300
----------------------------------------------------------------------
.../org/apache/omid/examples/BasicExample.java | 15 +-
.../omid/examples/ConfigurationExample.java | 23 +-
.../omid/examples/SnapshotIsolationExample.java | 9 +-
.../transaction/AttributeSetSnapshotFilter.java | 6 +-
.../apache/omid/transaction/HBaseCellId.java | 14 +-
.../transaction/HBaseSyncPostCommitter.java | 21 +-
.../omid/transaction/HBaseTransaction.java | 21 +-
.../transaction/HBaseTransactionManager.java | 26 +--
.../omid/transaction/HTableAccessWrapper.java | 14 +-
.../omid/transaction/SnapshotFilterImpl.java | 16 +-
.../org/apache/omid/transaction/TTable.java | 208 +++++++++----------
.../apache/omid/transaction/OmidTestBase.java | 60 +++---
.../TestAsynchronousPostCommitter.java | 62 +++---
.../apache/omid/transaction/TestAutoFlush.java | 8 +-
.../TestBaillisAnomaliesWithTXs.java | 60 +++---
.../omid/transaction/TestBasicTransaction.java | 60 +++---
.../apache/omid/transaction/TestCheckpoint.java | 51 ++---
.../apache/omid/transaction/TestDeletion.java | 62 +++---
.../TestEndToEndScenariosWithHA.java | 63 +++---
.../apache/omid/transaction/TestFilters.java | 29 +--
.../transaction/TestHBaseTransactionClient.java | 114 +++++-----
.../TestHBaseTransactionManager.java | 20 +-
.../transaction/TestMarkPutAsCommitted.java | 26 +--
.../omid/transaction/TestMultiplePut.java | 29 ++-
.../apache/omid/transaction/TestReadPath.java | 26 +--
.../omid/transaction/TestShadowCells.java | 96 +++++----
.../transaction/TestSingleColumnFamily.java | 12 +-
.../omid/transaction/TestTTableBehaviour.java | 25 +--
.../transaction/TestTransactionCleanup.java | 17 +-
.../transaction/TestTransactionConflict.java | 114 +++++-----
.../omid/transaction/TestTxMgrFailover.java | 35 ++--
.../apache/omid/transaction/TestUpdateScan.java | 24 +--
.../committable/hbase/HBaseCommitTable.java | 82 ++++----
.../committable/hbase/TestHBaseCommitTable.java | 58 +++---
.../org/apache/omid/transaction/CellUtils.java | 32 +--
.../apache/omid/transaction/TestCompaction.java | 208 ++++++++++---------
.../omid/transaction/TestSnapshotFilter.java | 126 ++++++-----
.../omid/tools/hbase/OmidTableManager.java | 30 +--
pom.xml | 4 +-
.../storage/HBaseTimestampStorage.java | 23 +-
.../org/apache/omid/tso/CacheEvaluation.java | 2 +-
.../org/apache/omid/tso/TestLeaseManager.java | 37 ++--
42 files changed, 1009 insertions(+), 959 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/examples/src/main/java/org/apache/omid/examples/BasicExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/omid/examples/BasicExample.java b/examples/src/main/java/org/apache/omid/examples/BasicExample.java
index d5f68eb..9090b05 100644
--- a/examples/src/main/java/org/apache/omid/examples/BasicExample.java
+++ b/examples/src/main/java/org/apache/omid/examples/BasicExample.java
@@ -17,13 +17,15 @@
*/
package org.apache.omid.examples;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.transaction.HBaseTransactionManager;
import org.apache.omid.transaction.TTable;
import org.apache.omid.transaction.Transaction;
import org.apache.omid.transaction.TransactionManager;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,20 +86,21 @@ public class BasicExample {
LOG.info("Creating access to Omid Transaction Manager & Transactional Table '{}'", userTableName);
try (TransactionManager tm = HBaseTransactionManager.newInstance();
- TTable txTable = new TTable(userTableName))
+ Connection conn = ConnectionFactory.createConnection();
+ TTable txTable = new TTable(conn, userTableName))
{
Transaction tx = tm.begin();
LOG.info("Transaction {} STARTED", tx);
Put row1 = new Put(exampleRow1);
- row1.add(family, qualifier, dataValue1);
+ row1.addColumn(family, qualifier, dataValue1);
txTable.put(tx, row1);
LOG.info("Transaction {} trying to write a new value in [TABLE:ROW/CF/Q] => {}:{}/{}/{} = {} ",
tx, userTableName, Bytes.toString(exampleRow1), Bytes.toString(family),
Bytes.toString(qualifier), Bytes.toString(dataValue1));
Put row2 = new Put(exampleRow2);
- row2.add(family, qualifier, dataValue2);
+ row2.addColumn(family, qualifier, dataValue2);
txTable.put(tx, row2);
LOG.info("Transaction {} trying to write a new value in [TABLE:ROW/CF/Q] => {}:{}/{}/{} = {} ",
tx, userTableName, Bytes.toString(exampleRow2), Bytes.toString(family),
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/examples/src/main/java/org/apache/omid/examples/ConfigurationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/omid/examples/ConfigurationExample.java b/examples/src/main/java/org/apache/omid/examples/ConfigurationExample.java
index aa59245..ec0678f 100644
--- a/examples/src/main/java/org/apache/omid/examples/ConfigurationExample.java
+++ b/examples/src/main/java/org/apache/omid/examples/ConfigurationExample.java
@@ -17,22 +17,24 @@
*/
package org.apache.omid.examples;
+import static org.apache.omid.tso.client.OmidClientConfiguration.ConnType.DIRECT;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.transaction.HBaseOmidClientConfiguration;
import org.apache.omid.transaction.HBaseTransactionManager;
import org.apache.omid.transaction.RollbackException;
import org.apache.omid.transaction.TTable;
import org.apache.omid.transaction.Transaction;
import org.apache.omid.transaction.TransactionManager;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
-import static org.apache.omid.tso.client.OmidClientConfiguration.ConnType.DIRECT;
-
/**
* ****************************************************************************************************************
*
@@ -100,21 +102,22 @@ public class ConfigurationExample {
LOG.info("Creating access to Omid Transaction Manager & Transactional Table '{}'", userTableName);
try (TransactionManager tm = HBaseTransactionManager.newInstance(configuration);
- TTable txTable = new TTable(userTableName))
+ Connection conn = ConnectionFactory.createConnection();
+ TTable txTable = new TTable(conn, userTableName))
{
for (int i = 0; i < 100; i++) {
Transaction tx = tm.begin();
LOG.info("Transaction #{} {} STARTED", i, tx);
Put row1 = new Put(exampleRow1);
- row1.add(family, qualifier, dataValue1);
+ row1.addColumn(family, qualifier, dataValue1);
txTable.put(tx, row1);
LOG.info("Transaction {} trying to write a new value in [TABLE:ROW/CF/Q] => {}:{}/{}/{} = {} ",
tx, userTableName, Bytes.toString(exampleRow1), Bytes.toString(family),
Bytes.toString(qualifier), Bytes.toString(dataValue1));
Put row2 = new Put(exampleRow2);
- row2.add(family, qualifier, dataValue2);
+ row2.addColumn(family, qualifier, dataValue2);
txTable.put(tx, row2);
LOG.info("Transaction {} trying to write a new value in [TABLE:ROW/CF/Q] => {}:{}/{}/{} = {} ",
tx, userTableName, Bytes.toString(exampleRow2), Bytes.toString(family),
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java b/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
index 60ea22b..b68e19b 100644
--- a/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
+++ b/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
@@ -17,9 +17,11 @@
*/
package org.apache.omid.examples;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -32,8 +34,7 @@ import org.apache.omid.transaction.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Arrays;
+import com.google.common.base.Preconditions;
/**
* ****************************************************************************************************************
@@ -118,7 +119,7 @@ public class SnapshotIsolationExample {
LOG.info("Creating access to Omid Transaction Manager & Transactional Table '{}'", userTableName);
tm = HBaseTransactionManager.newInstance();
- txTable = new TTable(userTableName);
+ txTable = new TTable(ConnectionFactory.createConnection(), userTableName);
}
void execute() throws IOException, RollbackException {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
index f4b6191..734ad5c 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
@@ -23,10 +23,10 @@ import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.proto.TSOProto;
@@ -35,9 +35,9 @@ import com.google.common.base.Optional;
public class AttributeSetSnapshotFilter implements SnapshotFilter {
- private HTableInterface table;
+ private Table table;
- public AttributeSetSnapshotFilter(HTableInterface table) {
+ public AttributeSetSnapshotFilter(Table table) {
this.table = table;
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java
index 63e6376..a70cfef 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java
@@ -17,23 +17,22 @@
*/
package org.apache.omid.transaction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import static com.google.common.base.Charsets.UTF_8;
import org.apache.omid.tso.client.CellId;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import static com.google.common.base.Charsets.UTF_8;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
public class HBaseCellId implements CellId {
- private final HTableInterface table;
+ private final TTable table;
private final byte[] row;
private final byte[] family;
private final byte[] qualifier;
private long timestamp;
- public HBaseCellId(HTableInterface table, byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+ public HBaseCellId(TTable table, byte[] row, byte[] family, byte[] qualifier, long timestamp) {
this.timestamp = timestamp;
this.table = table;
this.row = row;
@@ -41,7 +40,7 @@ public class HBaseCellId implements CellId {
this.qualifier = qualifier;
}
- public HTableInterface getTable() {
+ public TTable getTable() {
return table;
}
@@ -61,6 +60,7 @@ public class HBaseCellId implements CellId {
return timestamp;
}
+ @Override
public String toString() {
return new String(table.getTableName(), UTF_8)
+ ":" + new String(row, UTF_8)
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index 4b3560f..d5f9c4d 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -17,21 +17,22 @@
*/
package org.apache.omid.transaction;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import static org.apache.omid.metrics.MetricsUtils.name;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.Timer;
import org.apache.omid.tso.client.CellId;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.omid.metrics.MetricsUtils.name;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
public class HBaseSyncPostCommitter implements PostCommitActions {
@@ -53,12 +54,12 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture) {
Put put = new Put(cell.getRow());
- put.add(cell.getFamily(),
+ put.addColumn(cell.getFamily(),
CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
cell.getTimestamp(),
Bytes.toBytes(tx.getCommitTimestamp()));
try {
- cell.getTable().put(put);
+ cell.getTable().getHTable().put(put);
} catch (IOException e) {
LOG.warn("{}: Error inserting shadow cell {}", tx, cell, e);
updateSCFuture.setException(
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
index feb042f..ffd93d9 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
@@ -17,17 +17,14 @@
*/
package org.apache.omid.transaction;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
+import org.apache.hadoop.hbase.client.Delete;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
private static final Logger LOG = LoggerFactory.getLogger(HBaseTransaction.class);
@@ -45,9 +42,9 @@ public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
private void deleteCell(HBaseCellId cell) {
Delete delete = new Delete(cell.getRow());
- delete.deleteColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
+ delete.addColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
try {
- cell.getTable().delete(delete);
+ cell.getTable().getHTable().delete(delete);
} catch (IOException e) {
LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e);
}
@@ -74,7 +71,7 @@ public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
*/
public void flushTables() throws IOException {
- for (HTableInterface writtenTable : getWrittenTables()) {
+ for (TTable writtenTable : getWrittenTables()) {
writtenTable.flushCommits();
}
@@ -84,9 +81,9 @@ public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
// Helper methods
// ****************************************************************************************************************
- private Set<HTableInterface> getWrittenTables() {
+ private Set<TTable> getWrittenTables() {
HashSet<HBaseCellId> writeSet = (HashSet<HBaseCellId>) getWriteSet();
- Set<HTableInterface> tables = new HashSet<HTableInterface>();
+ Set<TTable> tables = new HashSet<TTable>();
for (HBaseCellId cell : writeSet) {
tables.add(cell.getTable());
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
index e20f873..12323c3 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
@@ -17,11 +17,15 @@
*/
package org.apache.omid.transaction;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
@@ -29,17 +33,13 @@ import org.apache.omid.tools.hbase.HBaseLogin;
import org.apache.omid.tso.client.CellId;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
import org.apache.omid.tso.client.TSOClient;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {
@@ -248,7 +248,7 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
this.hBaseCellId = hBaseCellId;
this.commitCache = commitCache;
this.tableAccessWrapper = null;
- this.tableAccessWrapper = new HTableAccessWrapper(hBaseCellId.getTable(), hBaseCellId.getTable());
+ this.tableAccessWrapper = new HTableAccessWrapper(hBaseCellId.getTable().getHTable(), hBaseCellId.getTable().getHTable());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
index 84c8d2c..f48fa55 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
@@ -17,25 +17,25 @@
*/
package org.apache.omid.transaction;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-
-import java.io.IOException;
-import java.util.List;
+import org.apache.hadoop.hbase.client.Table;
// This class wraps the HTableInterface object when doing client side filtering.
public class HTableAccessWrapper implements TableAccessWrapper {
- private final HTableInterface writeTable;
- private final HTableInterface readTable;
+ private final Table writeTable;
+ private final Table readTable;
- public HTableAccessWrapper(HTableInterface table, HTableInterface healerTable) {
+ public HTableAccessWrapper(Table table, Table healerTable) {
this.readTable = table;
this.writeTable = healerTable;
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
index 65bbcc5..9f3628d 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
@@ -23,7 +23,6 @@ import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.N
import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
import java.io.IOException;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
@@ -122,7 +122,7 @@ public class SnapshotFilterImpl implements SnapshotFilter {
byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
cell.getQualifierOffset(),
cell.getQualifierLength());
- put.add(family, shadowCellQualifier, cell.getTimestamp(), Bytes.toBytes(commitTimestamp));
+ put.addColumn(family, shadowCellQualifier, cell.getTimestamp(), Bytes.toBytes(commitTimestamp));
try {
tableAccessWrapper.put(put);
} catch (IOException e) {
@@ -633,6 +633,18 @@ public class SnapshotFilterImpl implements SnapshotFilter {
public void close() {
innerScanner.close();
}
+
+ // So that Omid works with both HBase 1.3 and 1.4 without needing
+ // a new profile. Since this doesn't existing in 1.3, we don't
+ // add an @Override for it.
+ public ScanMetrics getScanMetrics() {
+ return null;
+ }
+
+ // Same as above
+ public boolean renewLease() {
+ return false;
+ }
@Override
public Iterator<Result> iterator() {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 4813d5b..16400b8 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -19,6 +19,7 @@ package org.apache.omid.transaction;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -29,21 +30,21 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
@@ -57,88 +58,68 @@ import com.google.common.base.Optional;
/**
* Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque {@link
- * Transaction} object. It mimics the behavior in {@link org.apache.hadoop.hbase.client.HTableInterface}
+ * Transaction} object. It mimics the behavior in {@link org.apache.hadoop.hbase.client.Table}
*/
public class TTable implements Closeable {
private static Logger LOG = LoggerFactory.getLogger(TTable.class);
- private final HTableInterface healerTable;
-
- private HTableInterface table;
+ private Table table;
private SnapshotFilter snapshotFilter;
private boolean serverSideFilter;
-
+
+ private final List<Mutation> mutations;
+
+ private boolean autoFlush = true;
+
// ----------------------------------------------------------------------------------------------------------------
// Construction
// ----------------------------------------------------------------------------------------------------------------
- public TTable(Configuration conf, byte[] tableName) throws IOException {
- this(new HTable(conf, tableName));
- }
-
- public TTable(Configuration conf, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
- this(new HTable(conf, tableName), commitTableClient);
+ public TTable(Connection connection, byte[] tableName) throws IOException {
+ this(connection.getTable(TableName.valueOf(tableName)));
}
- public TTable(String tableName) throws IOException {
- this(HBaseConfiguration.create(), Bytes.toBytes(tableName));
+ public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
+ this(connection.getTable(TableName.valueOf(tableName)), commitTableClient);
}
- public TTable(Configuration conf, String tableName) throws IOException {
- this(conf, Bytes.toBytes(tableName));
+ public TTable(Connection connection, String tableName) throws IOException {
+ this(connection.getTable(TableName.valueOf(tableName)));
}
- public TTable(Configuration conf, String tableName, CommitTable.Client commitTableClient) throws IOException {
- this(conf, Bytes.toBytes(tableName), commitTableClient);
+ public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient) throws IOException {
+ this(connection.getTable(TableName.valueOf(tableName)), commitTableClient);
}
- public TTable(HTableInterface hTable) throws IOException {
+ public TTable(Table hTable) throws IOException {
this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false));
}
- public TTable(HTableInterface hTable, boolean serverSideFilter) throws IOException {
+ public TTable(Table hTable, boolean serverSideFilter) throws IOException {
table = hTable;
- healerTable = new HTable(table.getConfiguration(), table.getTableName());
+ mutations = new ArrayList<Mutation>();
this.serverSideFilter = serverSideFilter;
snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) :
- new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable));
+ new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable));
}
- public TTable(HTableInterface hTable, SnapshotFilter snapshotFilter ) throws IOException {
+ public TTable(Table hTable, SnapshotFilter snapshotFilter ) throws IOException {
table = hTable;
- healerTable = new HTable(table.getConfiguration(), table.getTableName());
+ mutations = new ArrayList<Mutation>();
this.snapshotFilter = snapshotFilter;
}
- public TTable(HTableInterface hTable, CommitTable.Client commitTableClient) throws IOException {
- table = hTable;
- healerTable = new HTable(table.getConfiguration(), table.getTableName());
- serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false);
- snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) :
- new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable), commitTableClient);
- }
-
- public TTable(HTableInterface hTable, HTableInterface healerTable) throws IOException {
- table = hTable;
- this.healerTable = healerTable;
- Configuration config = table.getConfiguration();
- serverSideFilter = (config == null) ? false : config.getBoolean("omid.server.side.filter", false);
- snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) :
- new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable));
- }
-
- public TTable(HTableInterface hTable, HTableInterface healerTable, CommitTable.Client commitTableClient) throws IOException {
+ public TTable(Table hTable, CommitTable.Client commitTableClient) throws IOException {
table = hTable;
- this.healerTable = healerTable;
+ mutations = new ArrayList<Mutation>();
serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false);
snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) :
- new SnapshotFilterImpl(new HTableAccessWrapper(hTable, healerTable), commitTableClient);
+ new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable), commitTableClient);
}
-
// ----------------------------------------------------------------------------------------------------------------
// Closeable implementation
// ----------------------------------------------------------------------------------------------------------------
@@ -151,7 +132,6 @@ public class TTable implements Closeable {
@Override
public void close() throws IOException {
table.close();
- healerTable.close();
}
// ----------------------------------------------------------------------------------------------------------------
@@ -159,7 +139,7 @@ public class TTable implements Closeable {
// ----------------------------------------------------------------------------------------------------------------
/**
- * Transactional version of {@link HTableInterface#get(Get get)}
+ * Transactional version of {@link Table#get(Get get)}
*
* @param get an instance of Get
* @param tx an instance of transaction to be used
@@ -215,12 +195,12 @@ public class TTable implements Closeable {
byte[] family = entryF.getKey();
for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
byte[] qualifier = entryQ.getKey();
- tx.addWriteSetElement(new HBaseCellId(table, deleteP.getRow(), family, qualifier,
+ tx.addWriteSetElement(new HBaseCellId(this, deleteP.getRow(), family, qualifier,
tx.getWriteTimestamp()));
}
- deleteP.add(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
+ deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
- tx.addWriteSetElement(new HBaseCellId(table, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
+ tx.addWriteSetElement(new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
tx.getWriteTimestamp()));
}
}
@@ -230,16 +210,16 @@ public class TTable implements Closeable {
Set<byte[]> fset = deleteG.getFamilyMap().keySet();
for (byte[] family : fset) {
- deleteP.add(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
+ deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
- tx.addWriteSetElement(new HBaseCellId(table, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
+ tx.addWriteSetElement(new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
tx.getWriteTimestamp()));
}
}
/**
- * Transactional version of {@link HTableInterface#delete(Delete delete)}
+ * Transactional version of {@link Table#delete(Delete delete)}
*
* @param delete an instance of Delete
* @param tx an instance of transaction to be used
@@ -268,12 +248,12 @@ public class TTable implements Closeable {
CellUtils.validateCell(cell, writeTimestamp);
switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
case DeleteColumn:
- deleteP.add(CellUtil.cloneFamily(cell),
+ deleteP.addColumn(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
writeTimestamp,
CellUtils.DELETE_TOMBSTONE);
transaction.addWriteSetElement(
- new HBaseCellId(table,
+ new HBaseCellId(this,
delete.getRow(),
CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
@@ -285,12 +265,12 @@ public class TTable implements Closeable {
break;
case Delete:
if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
- deleteP.add(CellUtil.cloneFamily(cell),
+ deleteP.addColumn(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
writeTimestamp,
CellUtils.DELETE_TOMBSTONE);
transaction.addWriteSetElement(
- new HBaseCellId(table,
+ new HBaseCellId(this,
delete.getRow(),
CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
@@ -314,7 +294,7 @@ public class TTable implements Closeable {
}
if (!deleteP.isEmpty()) {
- table.put(deleteP);
+ addMutation(deleteP);
}
}
@@ -324,7 +304,7 @@ public class TTable implements Closeable {
}
/**
- * Transactional version of {@link HTableInterface#put(Put put)}
+ * Transactional version of {@link Table#put(Put put)}
*
* @param put an instance of Put
* @param tx an instance of transaction to be used
@@ -351,7 +331,7 @@ public class TTable implements Closeable {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), timestamp);
tsput.add(kv);
- tsput.add(CellUtil.cloneFamily(kv),
+ tsput.addColumn(CellUtil.cloneFamily(kv),
CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
kv.getTimestamp(),
Bytes.toBytes(commitTimestamp));
@@ -365,10 +345,10 @@ public class TTable implements Closeable {
/**
* @param put an instance of Put
* @param tx an instance of transaction to be used
- * @param autoCommit denotes whether to automatically commit the put
+ * @param addShadowCell denotes whether to add the shadow cell
* @throws IOException if a remote or network exception occurs.
*/
- public void put(Transaction tx, Put put, boolean autoCommit) throws IOException {
+ public void put(Transaction tx, Put put, boolean addShadowCell) throws IOException {
throwExceptionIfOpSetsTimerange(put);
@@ -390,14 +370,14 @@ public class TTable implements Closeable {
Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), writeTimestamp);
tsput.add(kv);
- if (autoCommit) {
- tsput.add(CellUtil.cloneFamily(kv),
+ if (addShadowCell) {
+ tsput.addColumn(CellUtil.cloneFamily(kv),
CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
kv.getTimestamp(),
Bytes.toBytes(kv.getTimestamp()));
} else {
byte[] conflictFree = put.getAttribute(CellUtils.CONFLICT_FREE_MUTATION);
- HBaseCellId cellId = new HBaseCellId(table,
+ HBaseCellId cellId = new HBaseCellId(this,
CellUtil.cloneRow(kv),
CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv),
@@ -411,12 +391,18 @@ public class TTable implements Closeable {
}
}
}
-
- table.put(tsput);
+ addMutation(tsput);
}
+ private void addMutation(Mutation m) throws IOException {
+ mutations.add(m);
+ if (autoFlush) {
+ flushCommits();
+ }
+ }
+
/**
- * Transactional version of {@link HTableInterface#getScanner(Scan scan)}
+ * Transactional version of {@link Table#getScanner(Scan scan)}
*
* @param scan an instance of Scan
* @param tx an instance of transaction to be used
@@ -452,16 +438,15 @@ public class TTable implements Closeable {
}
/**
- * Delegates to {@link HTable#getTableName()}
*
* @return array of byte
*/
public byte[] getTableName() {
- return table.getTableName();
+ return table.getName().getName();
}
/**
- * Delegates to {@link HTable#getConfiguration()}
+ * Delegates to {@link Table#getConfiguration()}
*
* @return standard configuration object
*/
@@ -470,7 +455,7 @@ public class TTable implements Closeable {
}
/**
- * Delegates to {@link HTable#getTableDescriptor()}
+ * Delegates to {@link Table#getTableDescriptor()}
*
* @return HTableDescriptor an instance of HTableDescriptor
* @throws IOException if a remote or network exception occurs.
@@ -480,7 +465,7 @@ public class TTable implements Closeable {
}
/**
- * Transactional version of {@link HTableInterface#exists(Get get)}
+ * Transactional version of {@link Table#exists(Get get)}
*
* @param transaction an instance of transaction to be used
* @param get an instance of Get
@@ -508,7 +493,7 @@ public class TTable implements Closeable {
*/
/**
- * Transactional version of {@link HTableInterface#get(List gets)}
+ * Transactional version of {@link Table#get(List gets)}
*
* @param transaction an instance of transaction to be used
* @param gets list of Get instances
@@ -525,7 +510,7 @@ public class TTable implements Closeable {
}
/**
- * Transactional version of {@link HTableInterface#getScanner(byte[] family)}
+ * Transactional version of {@link Table#getScanner(byte[] family)}
*
* @param transaction an instance of transaction to be used
* @param family column family
@@ -539,7 +524,7 @@ public class TTable implements Closeable {
}
/**
- * Transactional version of {@link HTableInterface#getScanner(byte[] family, byte[] qualifier)}
+ * Transactional version of {@link Table#getScanner(byte[] family, byte[] qualifier)}
*
* @param transaction an instance of transaction to be used
* @param family column family
@@ -555,7 +540,7 @@ public class TTable implements Closeable {
}
/**
- * Transactional version of {@link HTableInterface#put(List puts)}
+ * Transactional version of {@link Table#put(List puts)}
*
* @param transaction an instance of transaction to be used
* @param puts List of puts
@@ -563,12 +548,31 @@ public class TTable implements Closeable {
*/
public void put(Transaction transaction, List<Put> puts) throws IOException {
for (Put put : puts) {
- put(transaction, put);
+ put(transaction, put, false);
}
}
/**
- * Transactional version of {@link HTableInterface#delete(List deletes)}
+ * Transactional version of {@link Table#put(List puts)}
+ *
+ * @param transaction an instance of transaction to be used
+ * @param puts List of puts
+ * @throws IOException if a remote or network exception occurs
+ */
+ public void batch(Transaction transaction, List<Mutation> mutations) throws IOException {
+ for (Mutation mutation : mutations) {
+ if (mutation instanceof Put) {
+ put(transaction, (Put)mutation);
+ } else if (mutation instanceof Delete) {
+ delete(transaction, (Delete)mutation);
+ } else {
+ throw new UnsupportedOperationException("Unsupported mutation: " + mutation);
+ }
+ }
+ }
+
+ /**
+ * Transactional version of {@link Table#delete(List deletes)}
*
* @param transaction an instance of transaction to be used
* @param deletes List of deletes
@@ -581,48 +585,32 @@ public class TTable implements Closeable {
}
/**
- * Provides access to the underliying HTable in order to configure it or to perform unsafe (non-transactional)
+ * Provides access to the underliying Table in order to configure it or to perform unsafe (non-transactional)
* operations. The latter would break the transactional guarantees of the whole system.
*
- * @return The underlying HTable object
+ * @return The underlying Table object
*/
- public HTableInterface getHTable() {
+ public Table getHTable() {
return table;
}
- /**
- * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
- */
public void setAutoFlush(boolean autoFlush) {
- table.setAutoFlush(autoFlush, true);
+ this.autoFlush = autoFlush;
}
- /**
- * Delegates to {@link HTable#isAutoFlush()}
- */
public boolean isAutoFlush() {
- return table.isAutoFlush();
+ return autoFlush;
}
- /**
- * Delegates to see HTable.getWriteBufferSize()
- */
- public long getWriteBufferSize() {
- return table.getWriteBufferSize();
- }
-
- /**
- * Delegates to see HTable.setWriteBufferSize()
- */
- public void setWriteBufferSize(long writeBufferSize) throws IOException {
- table.setWriteBufferSize(writeBufferSize);
- }
-
- /**
- * Delegates to see HTable.flushCommits()
- */
public void flushCommits() throws IOException {
- table.flushCommits();
+ try {
+ table.batch(this.mutations, new Object[mutations.size()]);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ } finally {
+ this.mutations.clear();
+ }
}
// ----------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
index 226db44..79f02eb 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
@@ -17,19 +17,12 @@
*/
package org.apache.omid.transaction;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import org.apache.omid.TestUtils;
-import org.apache.omid.committable.CommitTable;
-import org.apache.omid.committable.InMemoryCommitTable;
-import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
-import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
-import org.apache.omid.tools.hbase.OmidTableManager;
-import org.apache.omid.tso.TSOMockModule;
-import org.apache.omid.tso.TSOServer;
-import org.apache.omid.tso.TSOServerConfig;
-import org.apache.omid.tso.client.OmidClientConfiguration;
-import org.apache.omid.tso.client.TSOClient;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -39,11 +32,25 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
+import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
+import org.apache.omid.tools.hbase.OmidTableManager;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.apache.omid.tso.client.TSOClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.ITestContext;
@@ -52,11 +59,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeGroups;
import org.testng.annotations.BeforeMethod;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
public abstract class OmidTestBase {
@@ -65,6 +69,7 @@ public abstract class OmidTestBase {
static HBaseTestingUtility hBaseUtils;
private static MiniHBaseCluster hbaseCluster;
static Configuration hbaseConf;
+ static Connection connection;
protected static final String TEST_TABLE = "test";
protected static final String TEST_FAMILY = "data";
@@ -119,6 +124,7 @@ public abstract class OmidTestBase {
hBaseUtils = new HBaseTestingUtility(hbaseConf);
hbaseCluster = hBaseUtils.startMiniCluster(1);
+ connection = ConnectionFactory.createConnection(hbaseConf);
hBaseUtils.createTable(Bytes.toBytes(hBaseTimestampStorageConfig.getTableName()),
new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
Integer.MAX_VALUE);
@@ -210,17 +216,19 @@ public abstract class OmidTestBase {
public void afterMethod() {
try {
LOG.info("tearing Down");
- HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
+ Admin admin = hBaseUtils.getHBaseAdmin();
deleteTable(admin, TableName.valueOf(TEST_TABLE));
createTestTable();
- deleteTable(admin, TableName.valueOf(hBaseCommitTableConfig.getTableName()));
+ if (hBaseCommitTableConfig != null) {
+ deleteTable(admin, TableName.valueOf(hBaseCommitTableConfig.getTableName()));
+ }
createCommitTable();
} catch (Exception e) {
LOG.error("Error tearing down", e);
}
}
- void deleteTable(HBaseAdmin admin, TableName tableName) throws IOException {
+ void deleteTable(Admin admin, TableName tableName) throws IOException {
if (admin.tableExists(tableName)) {
if (admin.isTableDisabled(tableName)) {
admin.deleteTable(tableName);
@@ -231,16 +239,16 @@ public abstract class OmidTestBase {
}
}
- static boolean verifyValue(byte[] tableName, byte[] row,
+ static boolean verifyValue(Table table, byte[] row,
byte[] fam, byte[] col, byte[] value) {
- try (HTable table = new HTable(hbaseConf, tableName)) {
+ try {
Get g = new Get(row).setMaxVersions(1);
Result r = table.get(g);
Cell cell = r.getColumnLatestCell(fam, col);
if (LOG.isTraceEnabled()) {
- LOG.trace("Value for " + Bytes.toString(tableName) + ":"
+ LOG.trace("Value for " + table.getName().getNameAsString() + ":"
+ Bytes.toString(row) + ":" + Bytes.toString(fam)
+ Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(cell))
+ " (" + Bytes.toString(value) + " expected)");
@@ -248,7 +256,7 @@ public abstract class OmidTestBase {
return Bytes.equals(CellUtil.cloneValue(cell), value);
} catch (IOException e) {
- LOG.error("Error reading row " + Bytes.toString(tableName) + ":"
+ LOG.error("Error reading row " + table.getName().getNameAsString() + ":"
+ Bytes.toString(row) + ":" + Bytes.toString(fam)
+ Bytes.toString(col), e);
return false;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
index 1dc59f8..5979c80 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
@@ -17,18 +17,25 @@
*/
package org.apache.omid.transaction;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.omid.committable.CommitTable;
-import org.apache.omid.metrics.NullMetricsProvider;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.NullMetricsProvider;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -36,18 +43,12 @@ import org.slf4j.LoggerFactory;
import org.testng.ITestContext;
import org.testng.annotations.Test;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
@Test(groups = "sharedHBase")
public class TestAsynchronousPostCommitter extends OmidTestBase {
@@ -55,12 +56,11 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestAsynchronousPostCommitter.class);
private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
- private static final byte[] nonExistentFamily = Bytes.toBytes("non-existent");
private static final byte[] qualifier = Bytes.toBytes("test-qual");
byte[] row1 = Bytes.toBytes("test-is-committed1");
byte[] row2 = Bytes.toBytes("test-is-committed2");
-
+
@Test(timeOut = 30_000)
public void testPostCommitActionsAreCalledAsynchronously(ITestContext context) throws Exception {
@@ -107,16 +107,16 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
}
}).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
- try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ try (TTable txTable = new TTable(connection, TEST_TABLE)) {
// Execute tx with async post commit actions
Transaction tx1 = tm.begin();
Put put1 = new Put(row1);
- put1.add(family, qualifier, Bytes.toBytes("hey!"));
+ put1.addColumn(family, qualifier, Bytes.toBytes("hey!"));
txTable.put(tx1, put1);
Put put2 = new Put(row2);
- put2.add(family, qualifier, Bytes.toBytes("hou!"));
+ put2.addColumn(family, qualifier, Bytes.toBytes("hou!"));
txTable.put(tx1, put2);
tm.commit(tx1);
@@ -214,16 +214,16 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
}).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
- try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ try (TTable txTable = new TTable(connection, TEST_TABLE)) {
// Execute tx with async post commit actions
Transaction tx1 = tm.begin();
Put put1 = new Put(row1);
- put1.add(family, qualifier, Bytes.toBytes("hey!"));
+ put1.addColumn(family, qualifier, Bytes.toBytes("hey!"));
txTable.put(tx1, put1);
Put put2 = new Put(row2);
- put2.add(family, qualifier, Bytes.toBytes("hou!"));
+ put2.addColumn(family, qualifier, Bytes.toBytes("hou!"));
txTable.put(tx1, put2);
tm.commit(tx1);
@@ -283,16 +283,16 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
}).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class));
- try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ try (TTable txTable = new TTable(connection, TEST_TABLE)) {
// Execute tx with async post commit actions
Transaction tx1 = tm.begin();
Put put1 = new Put(row1);
- put1.add(family, qualifier, Bytes.toBytes("hey!"));
+ put1.addColumn(family, qualifier, Bytes.toBytes("hey!"));
txTable.put(tx1, put1);
Put put2 = new Put(row2);
- put2.add(family, qualifier, Bytes.toBytes("hou!"));
+ put2.addColumn(family, qualifier, Bytes.toBytes("hou!"));
txTable.put(tx1, put2);
tm.commit(tx1);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java
index 305e80a..fac64ac 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java
@@ -17,6 +17,8 @@
*/
package org.apache.omid.transaction;
+import static org.testng.Assert.assertEquals;
+
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -24,8 +26,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.testng.ITestContext;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-
@Test(groups = "sharedHBase")
public class TestAutoFlush extends OmidTestBase {
@@ -37,14 +37,14 @@ public class TestAutoFlush extends OmidTestBase {
byte[] col = Bytes.toBytes("col1");
byte[] data = Bytes.toBytes("data");
TransactionManager tm = newTransactionManager(context);
- TTable table = new TTable(hbaseConf, TEST_TABLE);
+ TTable table = new TTable(connection, TEST_TABLE);
// Turn off autoflush
table.setAutoFlush(false);
Transaction t = tm.begin();
Put put = new Put(row);
- put.add(family, col, data);
+ put.addColumn(family, col, data);
table.put(t, put);
// Data shouldn't be in DB yet
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
index cd6216c..9315751 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
@@ -17,6 +17,14 @@
*/
package org.apache.omid.transaction;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -31,14 +39,6 @@ import org.testng.ITestContext;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Arrays;
-
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.fail;
-
/**
* These tests try to analyze the transactional anomalies described by P. Baillis et al. in
* http://arxiv.org/pdf/1302.0309.pdf
@@ -89,7 +89,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 0) Start transactions
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Transaction tx2 = tm.begin();
@@ -103,7 +103,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 2) insert into test (id, value) values(3, 30); -- T2
Put newRow = new Put(rowId3);
- newRow.add(famName, colName, dataValue3);
+ newRow.addColumn(famName, colName, dataValue3);
txTable.put(tx2, newRow);
// 3) Commit TX 2
@@ -129,7 +129,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 0) Start transactions
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Transaction tx2 = tm.begin();
@@ -143,7 +143,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
Put row = new Put(updateRes.getRow());
int val = Bytes.toInt(updateRes.getValue(famName, colName));
LOG.info("Updating row id {} with value {}", Bytes.toString(updateRes.getRow()), val);
- row.add(famName, colName, Bytes.toBytes(val + 10));
+ row.addColumn(famName, colName, Bytes.toBytes(val + 10));
txTable.put(tx1, row);
updateRes = tx1Scanner.next();
count++;
@@ -198,7 +198,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 0) Start transactions
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Transaction tx2 = tm.begin();
@@ -237,12 +237,12 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 3) update test set value = 11 where id = 1; -- T1
Put updateRow1Tx1 = new Put(rowId1);
- updateRow1Tx1.add(famName, colName, Bytes.toBytes("11"));
+ updateRow1Tx1.addColumn(famName, colName, Bytes.toBytes("11"));
txTable.put(tx1, updateRow1Tx1);
// 4) update test set value = 11 where id = 1; -- T2
Put updateRow1Tx2 = new Put(rowId1);
- updateRow1Tx2.add(famName, colName, Bytes.toBytes("11"));
+ updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes("11"));
txTable.put(tx2, updateRow1Tx2);
// 5) commit -- T1
@@ -274,7 +274,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 0) Start transactions
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Transaction tx2 = tm.begin();
@@ -329,12 +329,12 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 4) update test set value = 12 where id = 1; -- T2
Put updateRow1Tx2 = new Put(rowId1);
- updateRow1Tx2.add(famName, colName, Bytes.toBytes("12"));
+ updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes("12"));
txTable.put(tx1, updateRow1Tx2);
// 5) update test set value = 18 where id = 1; -- T2
Put updateRow2Tx2 = new Put(rowId2);
- updateRow2Tx2.add(famName, colName, Bytes.toBytes("18"));
+ updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes("18"));
txTable.put(tx2, updateRow2Tx2);
// 6) commit -- T2
@@ -374,7 +374,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 0) Start transactions
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Transaction tx2 = tm.begin();
@@ -387,9 +387,9 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 3) update test set value = 12 where id = 1; -- T2
// 4) update test set value = 18 where id = 2; -- T2
Put updateRow1Tx2 = new Put(rowId1);
- updateRow1Tx2.add(famName, colName, Bytes.toBytes(12));
+ updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes(12));
Put updateRow2Tx2 = new Put(rowId2);
- updateRow2Tx2.add(famName, colName, Bytes.toBytes(18));
+ updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes(18));
txTable.put(tx2, Arrays.asList(updateRow1Tx2, updateRow2Tx2));
// 5) commit; -- T2
@@ -435,7 +435,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 0) Start transactions
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Transaction tx2 = tm.begin();
@@ -492,12 +492,12 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 3) update test set value = 11 where id = 1; -- T1
Put updateRow1Tx1 = new Put(rowId1);
- updateRow1Tx1.add(famName, colName, Bytes.toBytes("11"));
+ updateRow1Tx1.addColumn(famName, colName, Bytes.toBytes("11"));
txTable.put(tx1, updateRow1Tx1);
// 4) update test set value = 21 where id = 2; -- T2
Put updateRow2Tx2 = new Put(rowId2);
- updateRow2Tx2.add(famName, colName, Bytes.toBytes("21"));
+ updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes("21"));
txTable.put(tx2, updateRow2Tx2);
// 5) commit; -- T1
@@ -523,7 +523,7 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 0) Start transactions
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Transaction tx2 = tm.begin();
@@ -542,12 +542,12 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
// 3) insert into test (id, value) values(3, 30); -- T1
Put insertRow3Tx1 = new Put(rowId1);
- insertRow3Tx1.add(famName, colName, Bytes.toBytes("30"));
+ insertRow3Tx1.addColumn(famName, colName, Bytes.toBytes("30"));
txTable.put(tx1, insertRow3Tx1);
// 4) insert into test (id, value) values(4, 42); -- T2
Put updateRow4Tx2 = new Put(rowId2);
- updateRow4Tx2.add(famName, colName, Bytes.toBytes("42"));
+ updateRow4Tx2.addColumn(famName, colName, Bytes.toBytes("42"));
txTable.put(tx2, updateRow4Tx2);
// 5) commit; -- T1
@@ -570,14 +570,14 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
private void loadBaseDataOnTestTable(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
Transaction initializationTx = tm.begin();
Put row1 = new Put(rowId1);
- row1.add(famName, colName, dataValue1);
+ row1.addColumn(famName, colName, dataValue1);
txTable.put(initializationTx, row1);
Put row2 = new Put(rowId2);
- row2.add(famName, colName, dataValue2);
+ row2.addColumn(famName, colName, dataValue2);
txTable.put(initializationTx, row2);
tm.commit(initializationTx);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java
index 1b793a1..28af0a6 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java
@@ -17,6 +17,10 @@
*/
package org.apache.omid.transaction;
+import static org.junit.Assert.fail;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -28,10 +32,6 @@ import org.slf4j.LoggerFactory;
import org.testng.ITestContext;
import org.testng.annotations.Test;
-import static org.junit.Assert.fail;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
@Test(groups = "sharedHBase")
public class TestBasicTransaction extends OmidTestBase {
@@ -42,7 +42,7 @@ public class TestBasicTransaction extends OmidTestBase {
public void testTimestampsOfTwoRowsInstertedAfterCommitOfSingleTransactionAreEquals(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
byte[] rowName1 = Bytes.toBytes("row1");
byte[] rowName2 = Bytes.toBytes("row2");
@@ -54,10 +54,10 @@ public class TestBasicTransaction extends OmidTestBase {
Transaction tx1 = tm.begin();
Put row1 = new Put(rowName1);
- row1.add(famName1, colName1, dataValue1);
+ row1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, row1);
Put row2 = new Put(rowName2);
- row2.add(famName1, colName1, dataValue2);
+ row2.addColumn(famName1, colName1, dataValue2);
tt.put(tx1, row2);
tm.commit(tx1);
@@ -88,7 +88,7 @@ public class TestBasicTransaction extends OmidTestBase {
throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
byte[] rowName1 = Bytes.toBytes("row1");
byte[] rowName2 = Bytes.toBytes("row2");
@@ -103,10 +103,10 @@ public class TestBasicTransaction extends OmidTestBase {
Transaction tx1 = tm.begin();
Put row1 = new Put(rowName1);
- row1.add(famName1, colName1, dataValue1);
+ row1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, row1);
Put row2 = new Put(rowName2);
- row2.add(famName1, colName1, dataValue2);
+ row2.addColumn(famName1, colName1, dataValue2);
tt.put(tx1, row2);
tm.commit(tx1);
@@ -114,10 +114,10 @@ public class TestBasicTransaction extends OmidTestBase {
Transaction tx2 = tm.begin();
row1 = new Put(rowName1);
- row1.add(famName1, colName1, dataValue3);
+ row1.addColumn(famName1, colName1, dataValue3);
tt.put(tx2, row1);
row2 = new Put(rowName2);
- row2.add(famName1, colName1, dataValue4);
+ row2.addColumn(famName1, colName1, dataValue4);
tt.put(tx2, row2);
tm.commit(tx2);
@@ -155,7 +155,7 @@ public class TestBasicTransaction extends OmidTestBase {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -167,14 +167,14 @@ public class TestBasicTransaction extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
tm.commit(t1);
Transaction tread = tm.begin();
Transaction t2 = tm.begin();
p = new Put(row);
- p.add(fam, col, data2);
+ p.addColumn(fam, col, data2);
tt.put(t2, p);
tm.commit(t2);
@@ -192,7 +192,7 @@ public class TestBasicTransaction extends OmidTestBase {
public void runTestManyVersions(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -204,14 +204,14 @@ public class TestBasicTransaction extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
tm.commit(t1);
for (int i = 0; i < 5; ++i) {
Transaction t2 = tm.begin();
p = new Put(row);
- p.add(fam, col, data2);
+ p.addColumn(fam, col, data2);
tt.put(t2, p);
}
Transaction tread = tm.begin();
@@ -231,7 +231,7 @@ public class TestBasicTransaction extends OmidTestBase {
public void runTestInterleave(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -243,13 +243,13 @@ public class TestBasicTransaction extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
tm.commit(t1);
Transaction t2 = tm.begin();
p = new Put(row);
- p.add(fam, col, data2);
+ p.addColumn(fam, col, data2);
tt.put(t2, p);
Transaction tread = tm.begin();
@@ -278,7 +278,7 @@ public class TestBasicTransaction extends OmidTestBase {
public void testInterleavedScanReturnsTheRightSnapshotResults(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
// Basic data-scaffolding for test
byte[] fam = Bytes.toBytes(TEST_FAMILY);
@@ -296,7 +296,7 @@ public class TestBasicTransaction extends OmidTestBase {
byte[] row = Bytes.toBytes("row-to-scan" + i);
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
txTable.put(tx1, p);
}
tm.commit(tx1);
@@ -305,7 +305,7 @@ public class TestBasicTransaction extends OmidTestBase {
// that scans the table, gets the proper snapshot with the stuff written by Tx1
Transaction tx2 = tm.begin();
Put p = new Put(randomRow);
- p.add(fam, col, data2);
+ p.addColumn(fam, col, data2);
txTable.put(tx2, p);
Transaction scanTx = tm.begin(); // This is the concurrent transactional scanner
@@ -362,7 +362,7 @@ public class TestBasicTransaction extends OmidTestBase {
throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
// Basic data-scaffolding for test
byte[] fam = Bytes.toBytes(TEST_FAMILY);
@@ -380,7 +380,7 @@ public class TestBasicTransaction extends OmidTestBase {
byte[] row = Bytes.toBytes("row-to-scan" + i);
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
txTable.put(tx1, p);
}
tm.commit(tx1);
@@ -389,7 +389,7 @@ public class TestBasicTransaction extends OmidTestBase {
// right snapshot with the new value in the random row just written by Tx2
Transaction tx2 = tm.begin();
Put p = new Put(randomRow);
- p.add(fam, col, data2);
+ p.addColumn(fam, col, data2);
txTable.put(tx2, p);
int modifiedRows = 0;
@@ -442,7 +442,7 @@ public class TestBasicTransaction extends OmidTestBase {
throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
@@ -452,7 +452,7 @@ public class TestBasicTransaction extends OmidTestBase {
Transaction tx1 = tm.begin();
Put row1 = new Put(rowName1);
- row1.add(famName1, colName1, dataValue1);
+ row1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, row1);
Transaction tx2 = tm.begin();
@@ -465,7 +465,7 @@ public class TestBasicTransaction extends OmidTestBase {
assertEquals(r.size(), 0, "Unexpected size for read.");
row1 = new Put(rowName1);
- row1.add(famName1, colName1, dataValue1);
+ row1.addColumn(famName1, colName1, dataValue1);
tt.put(tx2, row1, true);
r = tt.get(tx3, g);