You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/03/09 21:30:08 UTC
[2/2] phoenix git commit: Implementing the TAL for Tephra
Implementing the TAL for Tephra
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3431902f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3431902f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3431902f
Branch: refs/heads/omid
Commit: 3431902fd83c590381acb9a817b05676e588004e
Parents: 96f8d09
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Tue Mar 7 12:03:50 2017 +0200
Committer: Thomas D'Silva <td...@apache.org>
Committed: Thu Mar 9 13:29:37 2017 -0800
----------------------------------------------------------------------
.../transaction/OmidTransactionContext.java | 77 +++++
.../transaction/OmidTransactionTable.java | 323 +++++++++++++++++++
.../transaction/PhoenixTransactionContext.java | 12 -
.../transaction/PhoenixTransactionalTable.java | 5 -
.../transaction/TephraTransactionContext.java | 285 ++++++++++++++++
.../transaction/TephraTransactionTable.java | 303 +++++++++++++++++
6 files changed, 988 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
new file mode 100644
index 0000000..937ac14
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -0,0 +1,77 @@
+package org.apache.phoenix.transaction;
+
+import java.sql.SQLException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.phoenix.schema.PTable;
+
+public class OmidTransactionContext implements PhoenixTransactionContext {
+
+ @Override
+ public void begin() throws SQLException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void commit() throws SQLException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void abort() throws SQLException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void checkpoint(boolean hasUncommittedData) throws SQLException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void commitDDLFence(PTable dataTable) throws SQLException,
+ InterruptedException, TimeoutException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void markDMLFence(PTable table) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void join(PhoenixTransactionContext ctx) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean isTransactionRunning() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getTransactionId() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long getReadPointer() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
new file mode 100644
index 0000000..d2cd020
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
@@ -0,0 +1,323 @@
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+public class OmidTransactionTable implements PhoenixTransactionalTable {
+
+ public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public Result get(Get get) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void put(Put put) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void delete(Delete delete) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public byte[] getTableName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HTableDescriptor getTableDescriptor() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean exists(Get get) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public Result[] get(List<Get> gets) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family, byte[] qualifier)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void put(List<Put> puts) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void delete(List<Delete> deletes) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public boolean isAutoFlush() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public long getWriteBufferSize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void setWriteBufferSize(long writeBufferSize) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void flushCommits() throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family,
+ byte[] qualifier, long amount, boolean writeToWAL)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public Boolean[] exists(List<Get> gets) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void setAutoFlushTo(boolean autoFlush) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TableName getName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean[] existsAll(List<Get> gets) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void batch(List<? extends Row> actions, Object[] results)
+ throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public Object[] batch(List<? extends Row> actions) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <R> void batchCallback(List<? extends Row> actions,
+ Object[] results, Callback<R> callback) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public <R> Object[] batchCallback(List<? extends Row> actions,
+ Callback<R> callback) throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, Put put) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ CompareOp compareOp, byte[] value, Put put) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, Delete delete) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+ CompareOp compareOp, byte[] value, Delete delete)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Result increment(Increment increment) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family,
+ byte[] qualifier, long amount) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family,
+ byte[] qualifier, long amount, Durability durability)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public CoprocessorRpcChannel coprocessorService(byte[] row) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends Service, R> Map<byte[], R> coprocessorService(
+ Class<T> service, byte[] startKey, byte[] endKey,
+ Call<T, R> callable) throws ServiceException, Throwable {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <T extends Service, R> void coprocessorService(Class<T> service,
+ byte[] startKey, byte[] endKey, Call<T, R> callable,
+ Callback<R> callback) throws ServiceException, Throwable {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public <R extends Message> Map<byte[], R> batchCoprocessorService(
+ MethodDescriptor methodDescriptor, Message request,
+ byte[] startKey, byte[] endKey, R responsePrototype)
+ throws ServiceException, Throwable {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public <R extends Message> void batchCoprocessorService(
+ MethodDescriptor methodDescriptor, Message request,
+ byte[] startKey, byte[] endKey, R responsePrototype,
+ Callback<R> callback) throws ServiceException, Throwable {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+ CompareOp compareOp, byte[] value, RowMutations mutation)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index f07640e..af0ff05 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -29,18 +29,6 @@ public interface PhoenixTransactionContext {
public void abort() throws SQLException;
/**
- * Rollback a transaction
- *
- * @param e
- * @throws SQLException
- */
- public void abort(SQLException e) throws SQLException;
-
- /**
- * Create a checkpoint in a transaction as defined in [TEPHRA-96]
- * @throws SQLException
- */
- public void checkpoint() throws SQLException;
/**
* Commit DDL to guarantee that no transaction started before create index
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
index 7495c5b..dcab73d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
@@ -101,11 +101,6 @@ public interface PhoenixTransactionalTable extends HTableInterface {
public void delete(List<Delete> deletes) throws IOException;
/**
- * Return the underling htable
- */
- public HTableInterface getHTable();
-
- /**
* Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
*/
public void setAutoFlush(boolean autoFlush);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
new file mode 100644
index 0000000..8fc5e0f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -0,0 +1,285 @@
+package org.apache.phoenix.transaction;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.Transaction.VisibilityLevel;
+import org.apache.tephra.visibility.FenceWait;
+import org.apache.tephra.visibility.VisibilityFence;
+
+import com.google.common.collect.Lists;
+
+public class TephraTransactionContext implements PhoenixTransactionContext {
+
+ private final List<TransactionAware> txAwares;
+ private final TransactionContext txContext;
+ private Transaction tx;
+ private TransactionSystemClient txServiceClient;
+ private TransactionFailureException e;
+
+ public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean threadSafe) {
+
+ this.txServiceClient = connection.getQueryServices().getTransactionSystemClient();
+
+ assert(ctx instanceof TephraTransactionContext);
+ TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
+
+ if (threadSafe) {
+ this.tx = tephraTransactionContext.getTransaction();
+ this.txAwares = Lists.newArrayList();
+ this.txContext = null;
+ } else {
+ this.txAwares = Collections.emptyList();
+ if (ctx == null) {
+ this.txContext = new TransactionContext(txServiceClient);
+ } else {
+ this.txContext = tephraTransactionContext.getContext();
+ }
+ }
+
+ this.e = null;
+ }
+
+ @Override
+ public void begin() throws SQLException {
+ if (txContext == null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
+ }
+
+ try {
+ txContext.start();
+ } catch (TransactionFailureException e) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage())
+ .setRootCause(e)
+ .build().buildException();
+ }
+ }
+
+ @Override
+ public void commit() throws SQLException {
+ try {
+ assert(txContext != null);
+ txContext.finish();
+ } catch (TransactionFailureException e) {
+ this.e = e;
+ if (e instanceof TransactionConflictException) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
+ .setMessage(e.getMessage())
+ .setRootCause(e)
+ .build().buildException();
+ }
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage())
+ .setRootCause(e)
+ .build().buildException();
+ }
+ }
+
+ @Override
+ public void abort() throws SQLException {
+ try {
+ if (e != null) {
+ txContext.abort(e);
+ e = null;
+ } else {
+ txContext.abort();
+ }
+ } catch (TransactionFailureException e) {
+ this.e = null;
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage())
+ .setRootCause(e)
+ .build().buildException();
+ }
+ }
+
+ @Override
+ public void checkpoint(boolean hasUncommittedData) throws SQLException {
+ if (hasUncommittedData) {
+ try {
+ if (txContext == null) {
+ tx = txServiceClient.checkpoint(tx);
+ } else {
+ assert(txContext != null);
+ txContext.checkpoint();
+ tx = txContext.getCurrentTransaction();
+ }
+ } catch (TransactionFailureException e) {
+ throw new SQLException(e);
+ }
+ }
+
+ if (txContext == null) {
+ tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ }
+ else {
+ assert(txContext != null);
+ txContext.getCurrentTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ }
+ }
+
+ @Override
+ public void commitDDLFence(PTable dataTable) throws SQLException,
+ InterruptedException, TimeoutException {
+ byte[] key = dataTable.getName().getBytes();
+ try {
+ FenceWait fenceWait = VisibilityFence.prepareWait(key, txServiceClient);
+ fenceWait.await(10000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
+ } catch (TimeoutException | TransactionFailureException e) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+ .setSchemaName(dataTable.getSchemaName().getString())
+ .setTableName(dataTable.getTableName().getString())
+ .build().buildException();
+ }
+ }
+
+ @Override
+ public void markDMLFence(PTable table) {
+ byte[] logicalKey = table.getName().getBytes();
+ TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
+ if (this.txContext == null) {
+ this.txAwares.add(logicalTxAware);
+ } else {
+ this.txContext.addTransactionAware(logicalTxAware);
+ }
+ byte[] physicalKey = table.getPhysicalName().getBytes();
+ if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
+ TransactionAware physicalTxAware = VisibilityFence.create(physicalKey);
+ if (this.txContext == null) {
+ this.txAwares.add(physicalTxAware);
+ } else {
+ this.txContext.addTransactionAware(physicalTxAware);
+ }
+ }
+ }
+
+ @Override
+ public void join(PhoenixTransactionContext ctx) {
+ assert(ctx instanceof TephraTransactionContext);
+ TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
+
+ tephraContext.getAwares();
+
+ if (txContext != null) {
+ for (TransactionAware txAware : tephraContext.getAwares()) {
+ txContext.addTransactionAware(txAware);
+ }
+ } else {
+ txAwares.addAll(tephraContext.getAwares());
+ }
+ }
+
+ @Override
+ public boolean isTransactionRunning() {
+ if (this.txContext != null) {
+ return (this.txContext.getCurrentTransaction() != null);
+ }
+
+ if (this.tx != null) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ tx = null;
+ txAwares.clear();
+ }
+
+ @Override
+ public long getTransactionId() {
+ if (this.txContext != null) {
+ return txContext.getCurrentTransaction().getTransactionId();
+ }
+
+ if (tx != null) {
+ return tx.getTransactionId();
+ }
+
+ return HConstants.LATEST_TIMESTAMP;
+ }
+
+ @Override
+ public long getReadPointer() {
+ if (this.txContext != null) {
+ return txContext.getCurrentTransaction().getReadPointer();
+ }
+
+ if (tx != null) {
+ return tx.getReadPointer();
+ }
+
+ return (-1);
+ }
+
+ /**
+ * TephraTransactionContext specific functions
+ */
+
+ Transaction getTransaction() {
+ return this.tx;
+ }
+
+ TransactionContext getContext() {
+ return this.txContext;
+ }
+
+ List<TransactionAware> getAwares() {
+ return txAwares;
+ }
+
+ void addTransactionAware(TransactionAware txAware) {
+ if (this.txContext != null) {
+ txContext.addTransactionAware(txAware);
+ } else if (this.tx != null) {
+ txAwares.add(txAware);
+ }
+ }
+
+ // For testing
+ public long getWritePointer() {
+ if (this.txContext != null) {
+ return txContext.getCurrentTransaction().getWritePointer();
+ }
+
+ if (tx != null) {
+ return tx.getWritePointer();
+ }
+
+ return HConstants.LATEST_TIMESTAMP;
+ }
+
+ // For testing
+ public VisibilityLevel getVisibilityLevel() {
+ if (this.txContext != null) {
+ return txContext.getCurrentTransaction().getVisibilityLevel();
+ }
+
+ if (tx != null) {
+ return tx.getVisibilityLevel();
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
new file mode 100644
index 0000000..50ea600
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
@@ -0,0 +1,303 @@
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+public class TephraTransactionTable implements PhoenixTransactionalTable {
+
+ private TransactionAwareHTable transactionAwareHTable;
+
+ private TephraTransactionContext tephraTransactionContext;
+
+ public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
+
+ assert(ctx instanceof TephraTransactionContext);
+
+ tephraTransactionContext = (TephraTransactionContext) ctx;
+
+ transactionAwareHTable = new TransactionAwareHTable(hTable);
+
+ tephraTransactionContext.addTransactionAware(transactionAwareHTable);
+ }
+
+ @Override
+ public Result get(Get get) throws IOException {
+ return transactionAwareHTable.get(get);
+ }
+
+ @Override
+ public void put(Put put) throws IOException {
+ transactionAwareHTable.put(put);
+ }
+
+ @Override
+ public void delete(Delete delete) throws IOException {
+ transactionAwareHTable.delete(delete);
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) throws IOException {
+ return transactionAwareHTable.getScanner(scan);
+ }
+
+ @Override
+ public byte[] getTableName() {
+ return transactionAwareHTable.getTableName();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return transactionAwareHTable.getConfiguration();
+ }
+
+ @Override
+ public HTableDescriptor getTableDescriptor() throws IOException {
+ return transactionAwareHTable.getTableDescriptor();
+ }
+
+ @Override
+ public boolean exists(Get get) throws IOException {
+ return transactionAwareHTable.exists(get);
+ }
+
+ @Override
+ public Result[] get(List<Get> gets) throws IOException {
+ return transactionAwareHTable.get(gets);
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family) throws IOException {
+ return transactionAwareHTable.getScanner(family);
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family, byte[] qualifier)
+ throws IOException {
+ return transactionAwareHTable.getScanner(family, qualifier);
+ }
+
+ @Override
+ public void put(List<Put> puts) throws IOException {
+ transactionAwareHTable.put(puts);
+ }
+
+ @Override
+ public void delete(List<Delete> deletes) throws IOException {
+ transactionAwareHTable.delete(deletes);
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush) {
+ transactionAwareHTable.setAutoFlush(autoFlush);
+ }
+
+ @Override
+ public boolean isAutoFlush() {
+ return transactionAwareHTable.isAutoFlush();
+ }
+
+ @Override
+ public long getWriteBufferSize() {
+ return transactionAwareHTable.getWriteBufferSize();
+ }
+
+ @Override
+ public void setWriteBufferSize(long writeBufferSize) throws IOException {
+ transactionAwareHTable.setWriteBufferSize(writeBufferSize);
+ }
+
+ @Override
+ public void flushCommits() throws IOException {
+ transactionAwareHTable.flushCommits();
+ }
+
+ @Override
+ public void close() throws IOException {
+ transactionAwareHTable.close();
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family,
+ byte[] qualifier, long amount, boolean writeToWAL)
+ throws IOException {
+ return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
+ }
+
+ @Override
+ public Boolean[] exists(List<Get> gets) throws IOException {
+ return transactionAwareHTable.exists(gets);
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+ transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail);
+ }
+
+ @Override
+ public void setAutoFlushTo(boolean autoFlush) {
+ transactionAwareHTable.setAutoFlush(autoFlush);
+ }
+
+ @Override
+ public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+ return transactionAwareHTable.getRowOrBefore(row, family);
+ }
+
+ @Override
+ public TableName getName() {
+ return transactionAwareHTable.getName();
+ }
+
+ @Override
+ public boolean[] existsAll(List<Get> gets) throws IOException {
+ return transactionAwareHTable.existsAll(gets);
+ }
+
+ @Override
+ public void batch(List<? extends Row> actions, Object[] results)
+ throws IOException, InterruptedException {
+ transactionAwareHTable.batch(actions, results);
+ }
+
+ @Override
+ public Object[] batch(List<? extends Row> actions) throws IOException,
+ InterruptedException {
+ return transactionAwareHTable.batch(actions);
+ }
+
+ @Override
+ public <R> void batchCallback(List<? extends Row> actions,
+ Object[] results, Callback<R> callback) throws IOException,
+ InterruptedException {
+ transactionAwareHTable.batchCallback(actions, results, callback);
+ }
+
+ @Override
+ public <R> Object[] batchCallback(List<? extends Row> actions,
+ Callback<R> callback) throws IOException, InterruptedException {
+ return transactionAwareHTable.batchCallback(actions, callback);
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, Put put) throws IOException {
+ return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put);
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ CompareOp compareOp, byte[] value, Put put) throws IOException {
+ return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put);
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, Delete delete) throws IOException {
+ return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete);
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+ CompareOp compareOp, byte[] value, Delete delete)
+ throws IOException {
+ return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete);
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+ transactionAwareHTable.mutateRow(rm);
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ return transactionAwareHTable.append(append);
+ }
+
+ @Override
+ public Result increment(Increment increment) throws IOException {
+ return transactionAwareHTable.increment(increment);
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family,
+ byte[] qualifier, long amount) throws IOException {
+ return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount);
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family,
+ byte[] qualifier, long amount, Durability durability)
+ throws IOException {
+ return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability);
+ }
+
+ @Override
+ public CoprocessorRpcChannel coprocessorService(byte[] row) {
+ return transactionAwareHTable.coprocessorService(row);
+ }
+
+ @Override
+ public <T extends Service, R> Map<byte[], R> coprocessorService(
+ Class<T> service, byte[] startKey, byte[] endKey,
+ Call<T, R> callable) throws ServiceException, Throwable {
+ return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable);
+ }
+
+ @Override
+ public <T extends Service, R> void coprocessorService(Class<T> service,
+ byte[] startKey, byte[] endKey, Call<T, R> callable,
+ Callback<R> callback) throws ServiceException, Throwable {
+ transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback);
+ }
+
+ @Override
+ public <R extends Message> Map<byte[], R> batchCoprocessorService(
+ MethodDescriptor methodDescriptor, Message request,
+ byte[] startKey, byte[] endKey, R responsePrototype)
+ throws ServiceException, Throwable {
+ return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
+ }
+
+ @Override
+ public <R extends Message> void batchCoprocessorService(
+ MethodDescriptor methodDescriptor, Message request,
+ byte[] startKey, byte[] endKey, R responsePrototype,
+ Callback<R> callback) throws ServiceException, Throwable {
+ transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
+ }
+
+ @Override
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+ CompareOp compareOp, byte[] value, RowMutations mutation)
+ throws IOException {
+ return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
+ }
+
+}