You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/17 10:28:11 UTC

phoenix git commit: Transactions over mutable indexes mostly working

Repository: phoenix
Updated Branches:
  refs/heads/txn 324b566f4 -> 1baa1b6b0


Transactions over mutable indexes mostly working


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1baa1b6b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1baa1b6b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1baa1b6b

Branch: refs/heads/txn
Commit: 1baa1b6b0c85ed06d4892648975d7e51998d75a3
Parents: 324b566
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Apr 17 01:28:12 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Apr 17 01:28:12 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/compile/DeleteCompiler.java  |   2 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   4 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |   2 +-
 .../apache/phoenix/execute/MutationState.java   | 140 +++++++++++++++++--
 .../apache/phoenix/index/PhoenixIndexCodec.java |   8 ++
 .../index/PhoenixTransactionalIndexer.java      |  68 ++++-----
 .../phoenix/iterate/TableResultIterator.java    |   8 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  90 +++---------
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   6 +-
 .../query/ConnectionQueryServicesImpl.java      |   3 +-
 10 files changed, 199 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index a0369d5..0778f75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -494,7 +494,7 @@ public class DeleteCompiler {
                         ImmutableBytesWritable ptr = context.getTempPtr();
                         PTable table = tableRef.getTable();
                         table.getIndexMaintainers(ptr, context.getConnection());
-                        byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+                        byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getMutationState().getTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
                         ServerCache cache = null;
                         try {
                             if (ptr.getLength() > 0) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index e72b634..67d289e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -163,7 +163,7 @@ public class UpsertCompiler {
                 if (isAutoCommit && rowCount % batchSize == 0) {
                     MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection);
                     connection.getMutationState().join(state);
-                    connection.commit();
+                    connection.getMutationState().send();
                     mutation.clear();
                 }
             }
@@ -610,7 +610,7 @@ public class UpsertCompiler {
                             ImmutableBytesWritable ptr = context.getTempPtr();
                             PTable table = tableRef.getTable();
                             table.getIndexMaintainers(ptr, context.getConnection());
-                            byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+                            byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getMutationState().getTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
 
                             ServerCache cache = null;
                             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 387f23d..0a3035c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -269,7 +269,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
         if (dataTable.isTransactional()) {
             PhoenixConnection conn = context.getConnection();
-            scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getTransactionContext().getCurrentTransaction()));
+            scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getMutationState().getTransaction()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 59bc6ce..a0cf8d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -28,6 +28,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TransactionSystemClient;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -88,17 +93,39 @@ public class MutationState implements SQLCloseable {
     //   rows - map from rowkey to columns
     //      columns - map from column to value
     private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
+    private final Transaction tx;
+    private final List<TransactionAware> txAwares;
+    private final TransactionContext txContext;
+    
     private long sizeOffset;
     private int numRows = 0;
+    private boolean txStarted = false;
     
     public MutationState(int maxSize, PhoenixConnection connection) {
-        this(maxSize,connection,0);
+        this(maxSize,connection,null);
+    }
+    
+    public MutationState(int maxSize, PhoenixConnection connection, Transaction tx) {
+        this(maxSize,connection, tx, 0);
     }
     
     public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) {
+        this(maxSize, connection, null, sizeOffset);
+    }
+    
+    public MutationState(int maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) {
         this.maxSize = maxSize;
         this.connection = connection;
         this.sizeOffset = sizeOffset;
+        this.tx = tx;
+        if (tx == null) {
+          this.txAwares = Collections.emptyList();
+          TransactionSystemClient txServiceClient = this.connection.getQueryServices().getTransactionSystemClient();
+          this.txContext = new TransactionContext(txServiceClient);
+        } else {
+            txAwares = Lists.newArrayList();
+            txContext = null;
+        }
     }
     
     public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
@@ -107,13 +134,19 @@ public class MutationState implements SQLCloseable {
         this.mutations.put(table, mutations);
         this.sizeOffset = sizeOffset;
         this.numRows = mutations.size();
+        this.txAwares = Lists.newArrayList();
+        this.txContext = null;
+        this.tx = connection.getMutationState().getTransaction();
         throwIfTooBig();
     }
     
-    private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) {
-        this.maxSize = maxSize;
-        this.connection = connection;
-        this.sizeOffset = sizeOffset;
+    private MutationState(MutationState state, List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries) {
+        this.maxSize = state.maxSize;
+        this.connection = state.connection;
+        this.sizeOffset = state.sizeOffset;
+        this.tx = state.tx;
+        this.txAwares = state.txAwares;
+        this.txContext = state.txContext;
         for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
             numRows += entry.getValue().size();
             this.mutations.put(entry.getKey(), entry.getValue());
@@ -121,6 +154,35 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
+    private void addTxParticipant(TransactionAware txAware) throws SQLException {
+        if (txContext == null) {
+            txAwares.add(txAware);
+            assert(tx != null);
+            txAware.startTx(tx);
+        } else {
+            txContext.addTransactionAware(txAware);
+        }
+    }
+    
+    public Transaction getTransaction() {
+        return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
+    }
+    
+    public void startTransaction() throws SQLException {
+        if (txContext == null) {
+            throw new SQLException("No transaction context"); // TODO: error code
+        }
+        
+        try {
+            if (!txStarted) {
+                txContext.start();
+                txStarted = true;
+            }
+        } catch (TransactionFailureException e) {
+            throw new SQLException(e); // TODO: error code
+        }
+    }
+    
     private void throwIfTooBig() {
         if (numRows > maxSize) {
             // TODO: throw SQLException ?
@@ -141,6 +203,15 @@ public class MutationState implements SQLCloseable {
         if (this == newMutation) { // Doesn't make sense
             return;
         }
+        // TODO: what if new and old have txContext as that's really an error
+        // Really it's an error if newMutation txContext is not null
+        if (txContext != null) {
+            for (TransactionAware txAware : txAwares) {
+                txContext.addTransactionAware(txAware);
+            }
+        } else {
+            txAwares.addAll(newMutation.txAwares);
+        }
         this.sizeOffset += newMutation.sizeOffset;
         // Merge newMutation with this one, keeping state from newMutation for any overlaps
         for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) {
@@ -394,7 +465,10 @@ public class MutationState implements SQLCloseable {
                     if (hasIndexMaintainers && isDataTable) {
                         byte[] attribValue = null;
                         byte[] uuidValue;
-                        byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+                        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
+                        if (table.isTransactional()) {
+                            txState = TransactionUtil.encodeTxnState(getTransaction());
+                        }
                         if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength() + txState.length)) {
                             IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
                             cache = client.addIndexMetadataCache(mutations, tempPtr, txState);
@@ -421,15 +495,17 @@ public class MutationState implements SQLCloseable {
                             }
                         }
                     }
-                    
+                
                     SQLException sqlE = null;
                     HTableInterface hTable = connection.getQueryServices().getTable(htableName);
                     try {
-                        // Don't add immutable indexes (those are the only ones that would participate
-                        // during a commit), as we don't need conflict detection for these.
-                        if (table.isTransactional() && isDataTable) {
+                        if (table.isTransactional()) {
                             TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable);
-                            connection.addTxParticipant(txnAware);
+                            // Don't add immutable indexes (those are the only ones that would participate
+                            // during a commit), as we don't need conflict detection for these.
+                            if (isDataTable) {
+                                addTxParticipant(txnAware);
+                            }
                             hTable = txnAware;
                         }
                         logMutationSize(hTable, mutations, connection);
@@ -465,7 +541,7 @@ public class MutationState implements SQLCloseable {
                         }
                         // Throw to client with both what was committed so far and what is left to be committed.
                         // That way, client can either undo what was done or try again with what was not done.
-                        sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
+                        sqlE = new CommitException(e, this, new MutationState(this, committedList));
                     } finally {
                         try {
                             if (cache != null) {
@@ -508,4 +584,44 @@ public class MutationState implements SQLCloseable {
     @Override
     public void close() throws SQLException {
     }
+
+    public void rollback() throws SQLException {
+        clear();
+        txAwares.clear();
+        if (txContext != null) {
+            try {
+                if (txStarted) {
+                    txContext.abort();
+                }
+            } catch (TransactionFailureException e) {
+                throw new SQLException(e); // TODO: error code
+            } finally {
+                txStarted = false;
+            }
+        }
+    }
+    
+    public void commit() throws SQLException {
+        try {
+            send();
+        } finally {
+            txAwares.clear();
+            if (txContext != null) {
+                try {
+                    if (txStarted) {
+                        txContext.finish();
+                    }
+                } catch (TransactionFailureException e) {
+                    try {
+                        txContext.abort(e);
+                        throw TransactionUtil.getSQLException(e);
+                    } catch (TransactionFailureException e1) {
+                        throw TransactionUtil.getSQLException(e);
+                    }
+                } finally {
+                    txStarted = false;
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 1fe9931..36b849d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -87,6 +87,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             indexUpdate.setTable(maintainer.getIndexTableName());
             Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
                     .getRegion().getStartKey(), env.getRegion().getEndKey());
+            if (put == null) {
+                throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString() 
+                        + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength()));
+            }
             indexUpdate.setUpdate(put);
             indexUpdates.add(indexUpdate);
         }
@@ -112,6 +116,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             indexUpdate.setTable(maintainer.getIndexTableName());
             Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
                     state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
+            if (delete == null) {
+                throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString() 
+                        + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength()));
+            }
             indexUpdate.setUpdate(delete);
             indexUpdates.add(indexUpdate);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index f6e6806..adba507 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -187,13 +187,13 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             if (scanner != null) {
                 Result result;
                 while ((result = scanner.next()) != null) {
-                    TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result);
+                    Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
+                    TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m, result);
                     Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
                     for (IndexUpdate delete : deletes) {
                         indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
                     }
-                    Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow()));
-                    state.applyMutation(m);
+                    state.applyMutation();
                     Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
                     for (IndexUpdate update : updates) {
                         indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
@@ -202,7 +202,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             }
             for (Mutation m : mutations.values()) {
                 TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
-                state.applyMutation(m);
+                state.applyMutation();
                 Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
                 for (IndexUpdate update : updates) {
                     indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
@@ -217,7 +217,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 
 
     private static class TxTableState implements TableState {
-        private final byte[] rowKey;
+        private final Mutation mutation;
         private final long currentTimestamp;
         private final RegionCoprocessorEnvironment env;
         private final Map<String, byte[]> attributes;
@@ -225,24 +225,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         private final Set<ColumnReference> indexedColumns;
         private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
         
-        private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, byte[] rowKey) {
+        private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation mutation) {
             this.env = env;
             this.currentTimestamp = currentTimestamp;
             this.indexedColumns = indexedColumns;
             this.attributes = attributes;
-            this.rowKey = rowKey;
+            this.mutation = mutation;
             int estimatedSize = indexedColumns.size();
             this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
             this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
+            try {
+                CellScanner scanner = mutation.cellScanner();
+                while (scanner.advance()) {
+                    Cell cell = scanner.current();
+                    pendingUpdates.add(cell);
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
         }
         
-        public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m) {
-            this(env, indexedColumns, attributes, currentTimestamp, m.getRow());
-            applyMutation(m);
-        }
-        
-        public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Result r) {
-            this(env, indexedColumns, attributes, currentTimestamp, r.getRow());
+        public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, Result r) {
+            this(env, indexedColumns, attributes, currentTimestamp, m);
 
             for (ColumnReference ref : indexedColumns) {
                 Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
@@ -271,7 +275,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 
         @Override
         public byte[] getCurrentRowKey() {
-            return rowKey;
+            return mutation.getRow();
         }
 
         @Override
@@ -279,32 +283,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             return Collections.emptyList();
         }
 
-        public void applyMutation(Mutation m) {
-            if (m instanceof Delete) {
+        public void applyMutation() {
+            if (mutation instanceof Delete) {
                 valueMap.clear();
             } else {
-                CellScanner scanner = m.cellScanner();
-                try {
-                    while (scanner.advance()) {
-                        Cell cell = scanner.current();
-                        if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
-                            ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
-                            valueMap.remove(ref);
-                        } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
-                            for (ColumnReference ref : indexedColumns) {
-                                if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
-                                    valueMap.remove(ref);
-                                }
+                for (Cell cell : pendingUpdates) {
+                    if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+                        ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+                        valueMap.remove(ref);
+                    } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
+                        for (ColumnReference ref : indexedColumns) {
+                            if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
+                                valueMap.remove(ref);
                             }
-                        } else {
-                            ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+                        }
+                    } else {
+                        ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+                        if (indexedColumns.contains(ref)) {
                             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                             ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                             valueMap.put(ref, ptr);
                         }
                     }
-                } catch (IOException e) {
-                    throw new RuntimeException(e); // Impossible
                 }
             }
         }
@@ -328,7 +328,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 
                 @Override
                 public byte[] getRowKey() {
-                    return rowKey;
+                    return mutation.getRow();
                 }
                 
             };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 7f5d527..9cece1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
+import co.cask.tephra.Transaction;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -87,9 +88,10 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
         PTable table = tableRef.getTable();
         HTableInterface htable = context.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
         if (table.isTransactional()) {
-            TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(htable);
-            context.getConnection().addTxParticipant(txnAware);
-            htable = txnAware;
+            TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable);
+            Transaction tx = context.getConnection().getMutationState().getTransaction(); 
+            txAware.startTx(tx);
+            htable = txAware;
         }
         this.htable = htable;
         if (creationMode == ScannerCreation.IMMEDIATE) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 240a599..d513362 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -54,10 +54,7 @@ import java.util.concurrent.Executor;
 
 import javax.annotation.Nullable;
 
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.Transaction;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.call.CallRunner;
@@ -97,7 +94,6 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
-import org.apache.phoenix.util.TransactionUtil;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.TraceScope;
 
@@ -139,7 +135,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     private final String timestampPattern;
     private TraceScope traceScope = null;
     
-    private TransactionContext txContext;
     private boolean isClosed = false;
     private Sampler<?> sampler;
     private boolean readOnly = false;
@@ -156,23 +151,22 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     }
 
     public PhoenixConnection(PhoenixConnection connection) throws SQLException {
-        this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache());
+        this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), connection.getMutationState().getTransaction());
         this.isAutoCommit = connection.isAutoCommit;
         this.sampler = connection.sampler;
     }
     
-    public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException {
-        this(connection.getQueryServices(), connection, scn);
-        this.sampler = connection.sampler;
-    }
-    
     public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
-        this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache());
+        this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache(), connection.getMutationState().getTransaction());
         this.isAutoCommit = connection.isAutoCommit;
         this.sampler = connection.sampler;
     }
     
     public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException {
+        this(services, url, info, metaData, null);
+    }
+    
+    public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, Transaction txn) throws SQLException {
         this.url = url;
         // Copy so client cannot change
         this.info = info == null ? new Properties() : PropertiesUtil.deepCopy(info);
@@ -242,7 +236,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
             }
             
         });
-        this.mutationState = new MutationState(maxSize, this);
+        this.mutationState = new MutationState(maxSize, this, txn);
         this.services.addConnection(this);
 
         // setup tracing, if its enabled
@@ -250,10 +244,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
     }
     
-    public TransactionContext getTransactionContext() {
-        return txContext;
-    }
-    
     private ImmutableMap<String, String> getImmutableCustomTracingAnnotations() {
     	Builder<String, String> result = ImmutableMap.builder();
     	result.putAll(JDBCUtil.getAnnotations(url, info));
@@ -437,60 +427,15 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         }
     }
 
-    public void startTransaction() throws SQLException {
-        if (txContext == null) {
-            boolean success = false;
-            try {
-                TransactionSystemClient txServiceClient = this.getQueryServices().getTransactionSystemClient();
-                this.txContext = new TransactionContext(txServiceClient);
-                txContext.start();
-                success = true;
-            } catch (TransactionFailureException e) {
-                throw new SQLException(e); // TODO: error code
-            } finally {
-                if (!success) endTransaction();
-            }
-        }
-    }
-    
-    public void addTxParticipant(TransactionAware txnAware) throws SQLException {
-    	if (!isTransactionStarted()) {
-    		startTransaction();
-    	}
-        txContext.addTransactionAware(txnAware);
-    }
-    
-    private boolean isTransactionStarted() {
-        return txContext != null;
-    }
-    
-    private void endTransaction() {
-        txContext = null;
-    }
-    
     @Override
     public void commit() throws SQLException {
         CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
             @Override
             public Void call() throws SQLException {
-                mutationState.send();
-                if (isTransactionStarted()) {
-					try {
-						txContext.finish();
-					} catch (TransactionFailureException e) {
-						try {
-						    txContext.abort(e);
-							throw TransactionUtil.getSQLException(e);
-						} catch (TransactionFailureException e1) {
-                            throw TransactionUtil.getSQLException(e);
-						}
-					} finally {
-					    endTransaction();
-					}
-                }
+                mutationState.commit();
                 return null;
             }
-        }, Tracing.withTracing(this, "sending mutations"));
+        }, Tracing.withTracing(this, "committing"));
     }
 
     @Override
@@ -690,16 +635,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
 
     @Override
     public void rollback() throws SQLException {
-        mutationState.clear();
-        if (isTransactionStarted()) {
-            try {
-                txContext.abort();
-            } catch (TransactionFailureException e) {
-                throw new SQLException(e); // TODO: error code
-            } finally {
-                endTransaction();
+        CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
+            @Override
+            public Void call() throws SQLException {
+                mutationState.rollback();
+                return null;
             }
-        }
+        }, Tracing.withTracing(this, "rolling back"));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 3ccc772..fb295d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -234,9 +234,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                     final long startTime = System.currentTimeMillis();
                     try {
                         QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
+                        startTransaction(plan);
                         plan = connection.getQueryServices().getOptimizer().optimize(
                                 PhoenixStatement.this, plan);
-                        startTransaction(plan);
                          // this will create its own trace internally, so we don't wrap this
                          // whole thing in tracing
                         ResultIterator resultIterator = plan.iterator();
@@ -279,10 +279,10 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         }
     }
     
-    private void startTransaction(StatementPlan plan) throws SQLException {
+    public void startTransaction(StatementPlan plan) throws SQLException {
         for (TableRef ref : plan.getContext().getResolver().getTables()) {
             if (ref.getTable().isTransactional()) {
-                connection.startTransaction();
+                connection.getMutationState().startTransaction();
                 break;
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 37a28fb..01cf8b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -137,7 +137,6 @@ import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableProperty;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.types.PBoolean;
@@ -2070,7 +2069,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     
     @Override
     public MutationState updateData(MutationPlan plan) throws SQLException {
-        TableRef currentTable = plan.getContext().getCurrentTable();
+        plan.getContext().getStatement().startTransaction(plan);
         return plan.execute();
     }