You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/03/09 21:30:08 UTC

[2/2] phoenix git commit: Implementing the TAL for Tephra

Implementing the TAL for Tephra


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3431902f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3431902f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3431902f

Branch: refs/heads/omid
Commit: 3431902fd83c590381acb9a817b05676e588004e
Parents: 96f8d09
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Tue Mar 7 12:03:50 2017 +0200
Committer: Thomas D'Silva <td...@apache.org>
Committed: Thu Mar 9 13:29:37 2017 -0800

----------------------------------------------------------------------
 .../transaction/OmidTransactionContext.java     |  77 +++++
 .../transaction/OmidTransactionTable.java       | 323 +++++++++++++++++++
 .../transaction/PhoenixTransactionContext.java  |  12 -
 .../transaction/PhoenixTransactionalTable.java  |   5 -
 .../transaction/TephraTransactionContext.java   | 285 ++++++++++++++++
 .../transaction/TephraTransactionTable.java     | 303 +++++++++++++++++
 6 files changed, 988 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
new file mode 100644
index 0000000..937ac14
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -0,0 +1,77 @@
+package org.apache.phoenix.transaction;
+
+import java.sql.SQLException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.phoenix.schema.PTable;
+
+public class OmidTransactionContext implements PhoenixTransactionContext {
+
+    @Override
+    public void begin() throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void abort() throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void checkpoint(boolean hasUncommittedData) throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void commitDDLFence(PTable dataTable) throws SQLException,
+            InterruptedException, TimeoutException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void markDMLFence(PTable table) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void join(PhoenixTransactionContext ctx) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public boolean isTransactionRunning() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void reset() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public long getTransactionId() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long getReadPointer() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
new file mode 100644
index 0000000..d2cd020
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
@@ -0,0 +1,323 @@
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+public class OmidTransactionTable implements PhoenixTransactionalTable {
+
+    public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public Result get(Get get) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void put(Put put) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void delete(Delete delete) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public byte[] getTableName() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean exists(Get get) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void put(List<Put> puts) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void delete(List<Delete> deletes) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public boolean isAutoFlush() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public long getWriteBufferSize() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public void setWriteBufferSize(long writeBufferSize) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void flushCommits() throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void close() throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount, boolean writeToWAL)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void setAutoFlushTo(boolean autoFlush) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public TableName getName() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean[] existsAll(List<Get> gets) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void batch(List<? extends Row> actions, Object[] results)
+            throws IOException, InterruptedException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException,
+            InterruptedException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> actions,
+            Object[] results, Callback<R> callback) throws IOException,
+            InterruptedException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> actions,
+            Callback<R> callback) throws IOException, InterruptedException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+            byte[] value, Put put) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, Put put) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+            byte[] value, Delete delete) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, Delete delete)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void mutateRow(RowMutations rm) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public Result append(Append append) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Result increment(Increment increment) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount, Durability durability)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(
+            Class<T> service, byte[] startKey, byte[] endKey,
+            Call<T, R> callable) throws ServiceException, Throwable {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service,
+            byte[] startKey, byte[] endKey, Call<T, R> callable,
+            Callback<R> callback) throws ServiceException, Throwable {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public <R extends Message> Map<byte[], R> batchCoprocessorService(
+            MethodDescriptor methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype)
+            throws ServiceException, Throwable {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public <R extends Message> void batchCoprocessorService(
+            MethodDescriptor methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype,
+            Callback<R> callback) throws ServiceException, Throwable {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, RowMutations mutation)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index f07640e..af0ff05 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -29,18 +29,6 @@ public interface PhoenixTransactionContext {
     public void abort() throws SQLException;
 
     /**
-     * Rollback a transaction
-     *
-     * @param e
-     * @throws SQLException
-     */
-    public void abort(SQLException e) throws SQLException;
-
-    /**
-     * Create a checkpoint in a transaction as defined in [TEPHRA-96]
-     * @throws SQLException
-     */
-    public void checkpoint() throws SQLException;
 
     /**
      * Commit DDL to guarantee that no transaction started before create index

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
index 7495c5b..dcab73d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
@@ -101,11 +101,6 @@ public interface PhoenixTransactionalTable extends HTableInterface {
     public void delete(List<Delete> deletes) throws IOException;
 
     /**
-     * Return the underling htable
-     */
-    public HTableInterface getHTable();
-
-    /**
      * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
      */
     public void setAutoFlush(boolean autoFlush);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
new file mode 100644
index 0000000..8fc5e0f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -0,0 +1,285 @@
+package org.apache.phoenix.transaction;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.Transaction.VisibilityLevel;
+import org.apache.tephra.visibility.FenceWait;
+import org.apache.tephra.visibility.VisibilityFence;
+
+import com.google.common.collect.Lists;
+
+public class TephraTransactionContext implements PhoenixTransactionContext {
+
+    private final List<TransactionAware> txAwares;
+    private final TransactionContext txContext;
+    private Transaction tx;
+    private TransactionSystemClient txServiceClient;
+    private TransactionFailureException e;
+
+    public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean threadSafe) {
+
+        this.txServiceClient = connection.getQueryServices().getTransactionSystemClient();
+
+        assert(ctx instanceof TephraTransactionContext);
+        TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
+
+        if (threadSafe) {
+            this.tx = tephraTransactionContext.getTransaction();
+            this.txAwares = Lists.newArrayList();
+            this.txContext = null;
+        } else {
+            this.txAwares = Collections.emptyList();
+            if (ctx == null) {
+                this.txContext = new TransactionContext(txServiceClient);
+            } else {
+                this.txContext = tephraTransactionContext.getContext();
+            }
+        }
+
+        this.e = null;
+    }
+
+    @Override
+    public void begin() throws SQLException {
+        if (txContext == null) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
+        }
+
+        try {
+            txContext.start();
+        } catch (TransactionFailureException e) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+            .setMessage(e.getMessage())
+            .setRootCause(e)
+            .build().buildException();
+        }
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        try {
+            assert(txContext != null);
+            txContext.finish();
+        } catch (TransactionFailureException e) {
+            this.e = e;
+            if (e instanceof TransactionConflictException) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
+                    .setMessage(e.getMessage())
+                    .setRootCause(e)
+                    .build().buildException();
+            }
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+                .setMessage(e.getMessage())
+                .setRootCause(e)
+                .build().buildException();
+        }
+    }
+
+    @Override
+    public void abort() throws SQLException {
+        try {
+            if (e != null) {
+                txContext.abort(e);
+                e = null;
+            } else {
+                txContext.abort();
+            }
+        } catch (TransactionFailureException e) {
+            this.e = null;
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+                .setMessage(e.getMessage())
+                .setRootCause(e)
+                .build().buildException();
+        }
+    }
+
+    @Override
+    public void checkpoint(boolean hasUncommittedData) throws SQLException {
+        if (hasUncommittedData) {
+            try {
+                if (txContext == null) {
+                    tx = txServiceClient.checkpoint(tx);
+                }  else {
+                    assert(txContext != null);
+                    txContext.checkpoint();
+                    tx = txContext.getCurrentTransaction();
+                }
+            } catch (TransactionFailureException e) {
+                throw new SQLException(e);
+            }
+        }
+
+        if (txContext == null) {
+            tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        }
+        else {
+            assert(txContext != null);
+            txContext.getCurrentTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        }
+    }
+
+    @Override
+    public void commitDDLFence(PTable dataTable) throws SQLException,
+            InterruptedException, TimeoutException {
+        byte[] key = dataTable.getName().getBytes();
+        try {
+            FenceWait fenceWait = VisibilityFence.prepareWait(key, txServiceClient);
+            fenceWait.await(10000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
+        } catch (TimeoutException | TransactionFailureException e) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+            .setSchemaName(dataTable.getSchemaName().getString())
+            .setTableName(dataTable.getTableName().getString())
+            .build().buildException();
+        }
+    }
+
+    @Override
+    public void markDMLFence(PTable table) {
+        byte[] logicalKey = table.getName().getBytes();
+        TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
+        if (this.txContext == null) {
+            this.txAwares.add(logicalTxAware);
+        } else {
+            this.txContext.addTransactionAware(logicalTxAware);
+        }
+        byte[] physicalKey = table.getPhysicalName().getBytes();
+        if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
+            TransactionAware physicalTxAware = VisibilityFence.create(physicalKey);
+            if (this.txContext == null) {
+                this.txAwares.add(physicalTxAware);
+            } else {
+                this.txContext.addTransactionAware(physicalTxAware);
+            }
+        }
+    }
+
+    @Override
+    public void join(PhoenixTransactionContext ctx) {
+        assert(ctx instanceof TephraTransactionContext);
+        TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
+
+        tephraContext.getAwares();
+
+        if (txContext != null) {
+            for (TransactionAware txAware : tephraContext.getAwares()) {
+                txContext.addTransactionAware(txAware);
+            }
+        } else {
+            txAwares.addAll(tephraContext.getAwares());
+        }
+    }
+
+    @Override
+    public boolean isTransactionRunning() {
+        if (this.txContext != null) {
+            return (this.txContext.getCurrentTransaction() != null);
+        }
+
+        if (this.tx != null) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void reset() {
+        tx = null;
+        txAwares.clear();
+    }
+
+    @Override
+    public long getTransactionId() {
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getTransactionId();
+        }
+
+        if (tx != null) {
+            return tx.getTransactionId();
+        }
+
+        return HConstants.LATEST_TIMESTAMP;
+    }
+
+    @Override
+    public long getReadPointer() {
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getReadPointer();
+        }
+
+        if (tx != null) {
+            return tx.getReadPointer();
+        }
+
+        return (-1);
+    }
+
+   /**
+    * TephraTransactionContext specific functions
+    */
+
+    Transaction getTransaction() {
+        return this.tx;
+    }
+
+    TransactionContext getContext() {
+        return this.txContext;
+    }
+
+    List<TransactionAware> getAwares() {
+        return txAwares;
+    }
+
+    void addTransactionAware(TransactionAware txAware) {
+        if (this.txContext != null) {
+            txContext.addTransactionAware(txAware);
+        } else if (this.tx != null) {
+            txAwares.add(txAware);
+        }
+    }
+
+    // For testing
+    public long getWritePointer() {
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getWritePointer();
+        }
+
+        if (tx != null) {
+            return tx.getWritePointer();
+        }
+
+        return HConstants.LATEST_TIMESTAMP;
+    }
+
+    // For testing
+    public VisibilityLevel getVisibilityLevel() {
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getVisibilityLevel();
+        }
+
+        if (tx != null) {
+            return tx.getVisibilityLevel();
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
new file mode 100644
index 0000000..50ea600
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
@@ -0,0 +1,303 @@
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+public class TephraTransactionTable implements PhoenixTransactionalTable {
+
+    private TransactionAwareHTable transactionAwareHTable;
+
+    private TephraTransactionContext tephraTransactionContext;
+
+    public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
+
+        assert(ctx instanceof TephraTransactionContext);
+
+        tephraTransactionContext = (TephraTransactionContext) ctx;
+
+        transactionAwareHTable = new TransactionAwareHTable(hTable);
+
+        tephraTransactionContext.addTransactionAware(transactionAwareHTable);
+    }
+
+    @Override
+    public Result get(Get get) throws IOException {
+        return transactionAwareHTable.get(get);
+    }
+
+    @Override
+    public void put(Put put) throws IOException {
+        transactionAwareHTable.put(put);
+    }
+
+    @Override
+    public void delete(Delete delete) throws IOException {
+        transactionAwareHTable.delete(delete);
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+        return transactionAwareHTable.getScanner(scan);
+    }
+
+    @Override
+    public byte[] getTableName() {
+        return transactionAwareHTable.getTableName();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        return transactionAwareHTable.getConfiguration();
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+        return transactionAwareHTable.getTableDescriptor();
+    }
+
+    @Override
+    public boolean exists(Get get) throws IOException {
+        return transactionAwareHTable.exists(get);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException {
+        return transactionAwareHTable.get(gets);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+        return transactionAwareHTable.getScanner(family);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier)
+            throws IOException {
+        return transactionAwareHTable.getScanner(family, qualifier);
+    }
+
+    @Override
+    public void put(List<Put> puts) throws IOException {
+        transactionAwareHTable.put(puts);
+    }
+
+    @Override
+    public void delete(List<Delete> deletes) throws IOException {
+        transactionAwareHTable.delete(deletes);
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush) {
+        transactionAwareHTable.setAutoFlush(autoFlush);
+    }
+
+    @Override
+    public boolean isAutoFlush() {
+        return transactionAwareHTable.isAutoFlush();
+    }
+
+    @Override
+    public long getWriteBufferSize() {
+        return transactionAwareHTable.getWriteBufferSize();
+    }
+
+    @Override
+    public void setWriteBufferSize(long writeBufferSize) throws IOException {
+        transactionAwareHTable.setWriteBufferSize(writeBufferSize);
+    }
+
+    @Override
+    public void flushCommits() throws IOException {
+        transactionAwareHTable.flushCommits();
+    }
+
+    @Override
+    public void close() throws IOException {
+        transactionAwareHTable.close();
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount, boolean writeToWAL)
+            throws IOException {
+        return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+        return transactionAwareHTable.exists(gets);
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+        transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail);
+    }
+
+    @Override
+    public void setAutoFlushTo(boolean autoFlush) {
+        transactionAwareHTable.setAutoFlush(autoFlush);
+    }
+
+    @Override
+    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+        return transactionAwareHTable.getRowOrBefore(row, family);
+    }
+
+    @Override
+    public TableName getName() {
+        return transactionAwareHTable.getName();
+    }
+
+    @Override
+    public boolean[] existsAll(List<Get> gets) throws IOException {
+        return transactionAwareHTable.existsAll(gets);
+    }
+
+    @Override
+    public void batch(List<? extends Row> actions, Object[] results)
+            throws IOException, InterruptedException {
+        transactionAwareHTable.batch(actions, results);
+    }
+
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException,
+            InterruptedException {
+        return transactionAwareHTable.batch(actions);
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> actions,
+            Object[] results, Callback<R> callback) throws IOException,
+            InterruptedException {
+        transactionAwareHTable.batchCallback(actions, results, callback);
+    }
+
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> actions,
+            Callback<R> callback) throws IOException, InterruptedException {
+        return transactionAwareHTable.batchCallback(actions, callback);
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+            byte[] value, Put put) throws IOException {
+        return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put);
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, Put put) throws IOException {
+        return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put);
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+            byte[] value, Delete delete) throws IOException {
+        return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete);
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, Delete delete)
+            throws IOException {
+        return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete);
+    }
+
+    @Override
+    public void mutateRow(RowMutations rm) throws IOException {
+        transactionAwareHTable.mutateRow(rm);
+    }
+
+    @Override
+    public Result append(Append append) throws IOException {
+        return transactionAwareHTable.append(append);
+    }
+
+    @Override
+    public Result increment(Increment increment) throws IOException {
+        return transactionAwareHTable.increment(increment);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount) throws IOException {
+        return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount, Durability durability)
+            throws IOException {
+        return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability);
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+        return transactionAwareHTable.coprocessorService(row);
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(
+            Class<T> service, byte[] startKey, byte[] endKey,
+            Call<T, R> callable) throws ServiceException, Throwable {
+        return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable);
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service,
+            byte[] startKey, byte[] endKey, Call<T, R> callable,
+            Callback<R> callback) throws ServiceException, Throwable {
+        transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback);
+    }
+
+    @Override
+    public <R extends Message> Map<byte[], R> batchCoprocessorService(
+            MethodDescriptor methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype)
+            throws ServiceException, Throwable {
+        return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
+    }
+
+    @Override
+    public <R extends Message> void batchCoprocessorService(
+            MethodDescriptor methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype,
+            Callback<R> callback) throws ServiceException, Throwable {
+        transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
+    }
+
+    @Override
+    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, RowMutations mutation)
+            throws IOException {
+        return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
+    }
+
+}