You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2015/12/01 05:18:30 UTC

[05/19] phoenix git commit: PHOENIX-1674 Snapshot isolation transaction support through Tephra (James Taylor, Thomas D'Silva)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a70c701/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d0ff7bb..910232f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -29,24 +29,31 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.monitoring.MutationMetricQueue;
 import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric;
@@ -56,9 +63,9 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.TableRef;
@@ -70,17 +77,28 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
-import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.TraceScope;
+import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionCodec;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
+import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.sun.istack.NotNull;
+import com.google.common.collect.Sets;
 
 /**
  * 
@@ -91,44 +109,217 @@ import com.sun.istack.NotNull;
  */
 public class MutationState implements SQLCloseable {
     private static final Logger logger = LoggerFactory.getLogger(MutationState.class);
-
+    private static final TransactionCodec CODEC = new TransactionCodec();
+    
     private PhoenixConnection connection;
     private final long maxSize;
-    private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
+    private final List<TransactionAware> txAwares;
+    private final TransactionContext txContext;
+    private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10);
+    
+    private Transaction tx;
     private long sizeOffset;
     private int numRows = 0;
+    private boolean txStarted = false;
+    
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
     
-    MutationState(long maxSize, PhoenixConnection connection,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) {
-        this.maxSize = maxSize;
-        this.connection = connection;
-        this.mutations = mutations;
-        boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
-        this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
-                : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-    }
-
     public MutationState(long maxSize, PhoenixConnection connection) {
-        this(maxSize,connection,0);
+        this(maxSize,connection, null);
+    }
+    
+    public MutationState(MutationState mutationState) {
+        this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction());
     }
     
     public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) {
-        this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
+        this(maxSize, connection, null, sizeOffset);
+    }
+    
+    private MutationState(long maxSize, PhoenixConnection connection, Transaction tx) {
+        this(maxSize,connection, tx, 0);
+    }
+    
+    private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) {
+    	this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx);
         this.sizeOffset = sizeOffset;
     }
     
+	MutationState(long maxSize, PhoenixConnection connection,
+			Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+			Transaction tx) {
+		this.maxSize = maxSize;
+		this.connection = connection;
+		this.mutations = mutations;
+		boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
+		this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
+				: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
+		this.tx = tx;
+		if (tx == null) {
+			this.txAwares = Collections.emptyList();
+			TransactionSystemClient txServiceClient = this.connection
+					.getQueryServices().getTransactionSystemClient();
+			this.txContext = new TransactionContext(txServiceClient);
+		} else {
+			txAwares = Lists.newArrayList();
+			txContext = null;
+		}
+	}
+
     public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
-        this(maxSize, connection, sizeOffset);
+        this(maxSize, connection, null, sizeOffset);
         this.mutations.put(table, mutations);
         this.numRows = mutations.size();
+        this.tx = connection.getMutationState().getTransaction();
         throwIfTooBig();
     }
     
+    public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException {
+        Transaction currentTx = getTransaction();
+        if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) {
+            return false;
+        }
+        Set<TableRef> sources = plan.getSourceRefs();
+        if (sources.isEmpty()) {
+            return false;
+        }
+        // For a DELETE statement, we're always querying the table being deleted from. This isn't
+        // a problem, but it potentially could be if there are other references to the same table
+        // nested in the DELETE statement (as a sub query or join, for example).
+        TableRef ignoreForExcludeCurrent = plan.getOperation() == Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null;
+        boolean excludeCurrent = false;
+        String targetPhysicalName = plan.getTargetRef().getTable().getPhysicalName().getString();
+        for (TableRef source : sources) {
+            if (source.getTable().isTransactional() && !source.equals(ignoreForExcludeCurrent)) {
+                String sourcePhysicalName = source.getTable().getPhysicalName().getString();
+                if (targetPhysicalName.equals(sourcePhysicalName)) {
+                    excludeCurrent = true;
+                    break;
+                }
+            }
+        }
+        // If we're querying the same table we're updating, we must exclude our writes to
+        // it from being visible.
+        if (excludeCurrent) {
+            // If any source tables have uncommitted data prior to last checkpoint,
+            // then we must create a new checkpoint.
+            boolean hasUncommittedData = false;
+            for (TableRef source : sources) {
+                String sourcePhysicalName = source.getTable().getPhysicalName().getString();
+                if (source.getTable().isTransactional() && uncommittedPhysicalNames.contains(sourcePhysicalName)) {
+                    hasUncommittedData = true;
+                    break;
+                }
+            }
+            if (hasUncommittedData) {
+                try {
+                	if (txContext == null) {
+                		currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
+                	}  else {
+                		txContext.checkpoint();
+                		currentTx = tx = txContext.getCurrentTransaction();
+                	}
+                    // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards
+                    // should see all this data.
+                    uncommittedPhysicalNames.clear();
+                } catch (TransactionFailureException e) {
+                    throw new SQLException(e);
+				} 
+            }
+            // Since we're querying our own table while mutating it, we must exclude
+            // see our current mutations, otherwise we can get erroneous results (for DELETE)
+            // or get into an infinite loop (for UPSERT SELECT).
+            currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+            return true;
+        }
+        return false;
+    }
+    
+    private void addTransactionParticipant(TransactionAware txAware) throws SQLException {
+        if (txContext == null) {
+            txAwares.add(txAware);
+            assert(tx != null);
+            txAware.startTx(tx);
+        } else {
+            txContext.addTransactionAware(txAware);
+        }
+    }
+    
+    // Though MutationState is not thread safe in general, this method should be because it may
+    // be called by TableResultIterator in a multi-threaded manner. Since we do not want to expose
+    // the Transaction outside of MutationState, this seems reasonable, as the member variables
+    // would not change as these threads are running.
+    public HTableInterface getHTable(PTable table) throws SQLException {
+        HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
+        Transaction currentTx;
+        if (table.isTransactional() && (currentTx=getTransaction()) != null) {
+            TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table);
+            // Using cloned mutationState as we may have started a new transaction already
+            // if auto commit is true and we need to use the original one here.
+            txAware.startTx(currentTx);
+            htable = txAware;
+        }
+        return htable;
+    }
+    
+    public PhoenixConnection getConnection() {
+    	return connection;
+    }
+    
+    // Kept private as the Transaction may change when check pointed. Keeping it private ensures
+    // no one holds on to a stale copy.
+    private Transaction getTransaction() {
+        return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
+    }
+    
+    public boolean isTransactionStarted() {
+    	return getTransaction() != null;
+    }
+    
+    public long getReadPointer() {
+    	Transaction tx = getTransaction();
+    	return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getReadPointer();
+    }
+    
+    // For testing
+    public long getWritePointer() {
+        Transaction tx = getTransaction();
+        return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer();
+    }
+    
+    // For testing
+    public VisibilityLevel getVisibilityLevel() {
+        Transaction tx = getTransaction();
+        return tx == null ? null : tx.getVisibilityLevel();
+    }
+    
+    public boolean startTransaction() throws SQLException {
+        if (txContext == null) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
+        }
+        
+		if (connection.getSCN() != null) {
+			throw new SQLExceptionInfo.Builder(
+					SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
+					.build().buildException();
+		}
+        
+        try {
+            if (!txStarted) {
+                txContext.start();
+                txStarted = true;
+                return true;
+            }
+        } catch (TransactionFailureException e) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException();
+        }
+        return false;
+    }
+
     public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap());
+        MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null);
         state.sizeOffset = 0;
         return state;
     }
@@ -154,6 +345,13 @@ public class MutationState implements SQLCloseable {
         if (this == newMutationState) { // Doesn't make sense
             return;
         }
+        if (txContext != null) {
+            for (TransactionAware txAware : newMutationState.txAwares) {
+                txContext.addTransactionAware(txAware);
+            }
+        } else {
+            txAwares.addAll(newMutationState.txAwares);
+        }
         this.sizeOffset += newMutationState.sizeOffset;
         // Merge newMutation with this one, keeping state from newMutation for any overlaps
         for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) {
@@ -203,7 +401,8 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
-    private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) {
+
+	private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) {
         RowKeySchema schema = table.getRowKeySchema();
         int rowTimestampColPos = table.getRowTimestampColPos();
         Field rowTimestampField = schema.getField(rowTimestampColPos); 
@@ -224,51 +423,15 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
     
-    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) {
+	private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { 
         final PTable table = tableRef.getTable();
-        boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
                 (table.isImmutableRows() || includeMutableIndexes) ? 
                         IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : 
                         Iterators.<PTable>emptyIterator();
-        final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
+        final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
-        Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator();
-        long timestampToUse = timestamp;
-        while (iterator.hasNext()) {
-            Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next();
-            ImmutableBytesPtr key = rowEntry.getKey();
-            RowMutationState state = rowEntry.getValue();
-            if (tableWithRowTimestampCol) {
-                RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo();
-                if (rowTsColInfo.useServerTimestamp()) {
-                    // regenerate the key with this timestamp.
-                    key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table);
-                } else {
-                    if (rowTsColInfo.getTimestamp() != null) {
-                        timestampToUse = rowTsColInfo.getTimestamp();
-                    }
-                }
-            }
-            PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key);
-            List<Mutation> rowMutations, rowMutationsPertainingToIndex;
-            if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
-                row.delete();
-                rowMutations = row.toRowMutations();
-                // Row deletes for index tables are processed by running a re-written query
-                // against the index table (as this allows for flexibility in being able to
-                // delete rows).
-                rowMutationsPertainingToIndex = Collections.emptyList();
-            } else {
-                for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) {
-                    row.setValue(valueEntry.getKey(), valueEntry.getValue());
-                }
-                rowMutations = row.toRowMutations();
-                rowMutationsPertainingToIndex = rowMutations;
-            }
-            mutations.addAll(rowMutations);
-            if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
-        }
+        generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex);
         return new Iterator<Pair<byte[],List<Mutation>>>() {
             boolean isFirst = true;
 
@@ -281,14 +444,24 @@ public class MutationState implements SQLCloseable {
             public Pair<byte[], List<Mutation>> next() {
                 if (isFirst) {
                     isFirst = false;
-                    return new Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(),mutations);
+                    return new Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(), mutationList);
                 }
                 PTable index = indexes.next();
                 List<Mutation> indexMutations;
                 try {
                     indexMutations =
                             IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex,
-                                tempPtr, connection.getKeyValueBuilder(), connection);
+                                connection.getKeyValueBuilder(), connection);
+                    // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
+                    if (!sendAll) {
+	                    TableRef key = new TableRef(index);
+						Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key);
+	                    if (rowToColumnMap!=null) {
+		                    final List<Mutation> deleteMutations = Lists.newArrayList();
+		                    generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null);
+		                    indexMutations.addAll(deleteMutations);
+	                    }
+                    }
                 } catch (SQLException e) {
                     throw new IllegalDataException(e);
                 }
@@ -302,28 +475,84 @@ public class MutationState implements SQLCloseable {
             
         };
     }
+
+    private void generateMutations(final TableRef tableRef, long timestamp,
+            final Map<ImmutableBytesPtr, RowMutationState> values,
+            final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
+        final PTable table = tableRef.getTable();
+        boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
+        Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator =
+                values.entrySet().iterator();
+        long timestampToUse = timestamp;
+        while (iterator.hasNext()) {
+            Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next();
+            ImmutableBytesPtr key = rowEntry.getKey();
+            RowMutationState state = rowEntry.getValue();
+            if (tableWithRowTimestampCol) {
+                RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo();
+                if (rowTsColInfo.useServerTimestamp()) {
+                    // regenerate the key with this timestamp.
+                    key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table);
+                } else {
+                    if (rowTsColInfo.getTimestamp() != null) {
+                        timestampToUse = rowTsColInfo.getTimestamp();
+                    }
+                }
+            }
+            PRow row =
+                    tableRef.getTable()
+                            .newRow(connection.getKeyValueBuilder(), timestampToUse, key);
+            List<Mutation> rowMutations, rowMutationsPertainingToIndex;
+            if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
+                row.delete();
+                rowMutations = row.toRowMutations();
+                // Row deletes for index tables are processed by running a re-written query
+                // against the index table (as this allows for flexibility in being able to
+                // delete rows).
+                rowMutationsPertainingToIndex = Collections.emptyList();
+            } else {
+                for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues()
+                        .entrySet()) {
+                    row.setValue(valueEntry.getKey(), valueEntry.getValue());
+                }
+                rowMutations = row.toRowMutations();
+                rowMutationsPertainingToIndex = rowMutations;
+            }
+            mutationList.addAll(rowMutations);
+            if (mutationsPertainingToIndex != null) mutationsPertainingToIndex
+                    .addAll(rowMutationsPertainingToIndex);
+        }
+    }
     
     /**
      * Get the unsorted list of HBase mutations for the tables with uncommitted data.
      * @return list of HBase mutations for uncommitted data.
      */
+    public Iterator<Pair<byte[],List<Mutation>>> toMutations(Long timestamp) {
+        return toMutations(false, timestamp);
+    }
+    
     public Iterator<Pair<byte[],List<Mutation>>> toMutations() {
-        return toMutations(false);
+        return toMutations(false, null);
     }
     
     public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
+        return toMutations(includeMutableIndexes, null);
+    }
+    
+    public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) {
         final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
         if (!iterator.hasNext()) {
             return Iterators.emptyIterator();
         }
         Long scn = connection.getSCN();
-        final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+        final long timestamp = (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn);
         return new Iterator<Pair<byte[],List<Mutation>>>() {
             private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
                     
             private Iterator<Pair<byte[],List<Mutation>>> init() {
-                return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes);
+                return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes, true);
             }
             
             @Override
@@ -347,58 +576,62 @@ public class MutationState implements SQLCloseable {
         };
     }
         
-    /**
-     * Validates that the meta data is valid against the server meta data if we haven't yet done so.
-     * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data
-     * has changed.
-     * @param connection
-     * @return the server time to use for the upsert
-     * @throws SQLException if the table or any columns no longer exist
-     */
-    private long[] validate() throws SQLException {
-        int i = 0;
-        Long scn = connection.getSCN();
-        MetaDataClient client = new MetaDataClient(connection);
-        long[] timeStamps = new long[this.mutations.size()];
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
-            TableRef tableRef = entry.getKey();
-            long serverTimeStamp = tableRef.getTimeStamp();
-            PTable table = tableRef.getTable();
-            // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
-            // so no need to do it again here.
-            if (!connection.getAutoCommit()) {
-                MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
-                long timestamp = result.getMutationTime();
-                if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
-                    serverTimeStamp = timestamp;
-                    if (result.wasUpdated()) {
-                        // TODO: use bitset?
-                        table = result.getTable();
-                        PColumn[] columns = new PColumn[table.getColumns().size()];
-                        for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
-                        	RowMutationState valueEntry = rowEntry.getValue();
-                            if (valueEntry != null) {
-                            	Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
-                            	if (colValues != PRow.DELETE_MARKER) {
-	                                for (PColumn column : colValues.keySet()) {
-	                                    columns[column.getPosition()] = column;
-	                                }
-                            	}
-                            }
+	/**
+	 * Validates that the meta data is valid against the server meta data if we haven't yet done so.
+	 * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data
+	 * has changed.
+	 * @param connection
+	 * @return the server time to use for the upsert
+	 * @throws SQLException if the table or any columns no longer exist
+	 */
+	private long[] validateAll() throws SQLException {
+	    int i = 0;
+	    long[] timeStamps = new long[this.mutations.size()];
+	    for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
+	        TableRef tableRef = entry.getKey();
+	        timeStamps[i++] = validate(tableRef, entry.getValue());
+	    }
+	    return timeStamps;
+	}
+	
+	private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException {
+	    Long scn = connection.getSCN();
+	    MetaDataClient client = new MetaDataClient(connection);
+	    long serverTimeStamp = tableRef.getTimeStamp();
+	    PTable table = tableRef.getTable();
+	    // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
+	    // so no need to do it again here.
+	    if (!connection.getAutoCommit()) {
+            MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+            long timestamp = result.getMutationTime();
+            if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
+                serverTimeStamp = timestamp;
+                if (result.wasUpdated()) {
+                    // TODO: use bitset?
+                    table = result.getTable();
+                    PColumn[] columns = new PColumn[table.getColumns().size()];
+                    for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
+                    	RowMutationState valueEntry = rowEntry.getValue();
+                        if (valueEntry != null) {
+                        	Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+                        	if (colValues != PRow.DELETE_MARKER) {
+                                for (PColumn column : colValues.keySet()) {
+                                    columns[column.getPosition()] = column;
+                                }
+                        	}
                         }
-                        for (PColumn column : columns) {
-                            if (column != null) {
-                                table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
-                            }
+                    }
+                    for (PColumn column : columns) {
+                        if (column != null) {
+                            table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
                         }
-                        tableRef.setTable(table);
                     }
+                    tableRef.setTable(table);
                 }
             }
-            timeStamps[i++] = scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn;
         }
-        return timeStamps;
-    }
+	    return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn;
+	}
     
     private static long calculateMutationSize(List<Mutation> mutations) {
         long byteSize = 0;
@@ -411,150 +644,292 @@ public class MutationState implements SQLCloseable {
         return byteSize;
     }
     
+    private boolean hasKeyValueColumn(PTable table, PTable index) {
+        IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+        return !maintainer.getAllColumns().isEmpty();
+    }
+    
+    private void divideImmutableIndexes(Iterator<PTable> enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> keyValueIndexes) {
+        while (enabledImmutableIndexes.hasNext()) {
+            PTable index = enabledImmutableIndexes.next();
+            if (index.getIndexType() != IndexType.LOCAL) {
+                if (hasKeyValueColumn(table, index)) {
+                    keyValueIndexes.add(index);
+                } else {
+                    rowKeyIndexes.add(index);
+                }
+            }
+        }
+    }
+    private class MetaDataAwareHTable extends DelegateHTable {
+        private final TableRef tableRef;
+        
+        private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) {
+            super(delegate);
+            this.tableRef = tableRef;
+        }
+        
+        /**
+         * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an
+         * opportunity to attach our index meta data to the mutations such that we can also undo
+         * the index mutations.
+         */
+        @Override
+        public void delete(List<Delete> deletes) throws IOException {
+            try {
+                PTable table = tableRef.getTable();
+                List<PTable> indexes = table.getIndexes();
+                Iterator<PTable> enabledIndexes = IndexMaintainer.nonDisabledIndexIterator(indexes.iterator());
+                if (enabledIndexes.hasNext()) {
+                    List<PTable> keyValueIndexes = Collections.emptyList();
+                    ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
+                    boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr, connection);
+                    if (table.isImmutableRows()) {
+                        List<PTable> rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size());
+                        keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size());
+                        divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes);
+                        // Generate index deletes for immutable indexes that only reference row key
+                        // columns and submit directly here.
+                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                        for (PTable index : rowKeyIndexes) {
+                            List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection);
+                            HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
+                            hindex.delete(indexDeletes);
+                        }
+                    }
+                    
+                    // If we have mutable indexes, local immutable indexes, or global immutable indexes
+                    // that reference key value columns, setup index meta data and attach here. In this
+                    // case updates to the indexes will be generated on the server side.
+                    // An alternative would be to let Tephra track the row keys for the immutable index
+                    // by adding it as a transaction participant (soon we can prevent any conflict
+                    // detection from occurring) with the downside being the additional memory required.
+                    if (!keyValueIndexes.isEmpty()) {
+                        attachMetaData = true;
+                        IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection);
+                    }
+                    if (attachMetaData) {
+                        setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr);
+                    }
+                }
+                delegate.delete(deletes);
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+    
     @SuppressWarnings("deprecation")
-    public void commit() throws SQLException {
+    private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
         int i = 0;
+        long[] serverTimeStamps = null;
+        boolean sendAll = false;
+        if (tableRefIterator == null) {
+            serverTimeStamps = validateAll();
+            tableRefIterator = mutations.keySet().iterator();
+            sendAll = true;
+        }
 
-        PName tenantId = connection.getTenantId();
-        long[] serverTimeStamps = validate();
-        Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
             Span span = trace.getSpan();
-            while (iterator.hasNext()) {
-                Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
-                // at this point we are going through mutations for each table
-
-                Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
-                // above is mutations for a table where the first part is the row key and the second part is column values.
-
-                TableRef tableRef = entry.getKey();
-                PTable table = tableRef.getTable();
-                table.getIndexMaintainers(tempPtr, connection);
-                boolean hasIndexMaintainers = tempPtr.getLength() > 0;
-                boolean isDataTable = true;
-                long serverTimestamp = serverTimeStamps[i++];
-                Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
-                // above returns an iterator of pair where the first  
-                while (mutationsIterator.hasNext()) {
-                    Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
-                    byte[] htableName = pair.getFirst();
-                    List<Mutation> mutations = pair.getSecond();
-
-                    //create a span per target table
-                    //TODO maybe we can be smarter about the table name to string here?
-                    Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
-
-                    int retryCount = 0;
-                    boolean shouldRetry = false;
-                    do {
-                        ServerCache cache = null;
-                        if (hasIndexMaintainers && isDataTable) {
-                            byte[] attribValue = null;
-                            byte[] uuidValue;
-                            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
-                                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                                cache = client.addIndexMetadataCache(mutations, tempPtr);
-                                child.addTimelineAnnotation("Updated index metadata cache");
-                                uuidValue = cache.getId();
-                                // If we haven't retried yet, retry for this case only, as it's possible that
-                                // a split will occur after we send the index metadata cache to all known
-                                // region servers.
-                                shouldRetry = true;
-                            } else {
-                                attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr);
-                                uuidValue = ServerCacheClient.generateId();
-                            }
-                            // Either set the UUID to be able to access the index metadata from the cache
-                            // or set the index metadata directly on the Mutation
-                            for (Mutation mutation : mutations) {
-                                if (tenantId != null) {
-                                    byte[] tenantIdBytes = ScanUtil.getTenantIdBytes(
-                                        table.getRowKeySchema(),
-                                        table.getBucketNum()!=null,
-                                        tenantId);
-                                    mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
-                                }
-                                mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                if (attribValue != null) {
-                                    mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                                }
-                            }
-                        }
-
-                        SQLException sqlE = null;
-                        HTableInterface hTable = connection.getQueryServices().getTable(htableName);
-                        try {
-                            long numMutations = mutations.size();
+	        ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
+	        while (tableRefIterator.hasNext()) {
+	        	// at this point we are going through mutations for each table
+	            TableRef tableRef = tableRefIterator.next();
+	            Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef);
+	            if (valuesMap == null || valuesMap.isEmpty()) {
+	                continue;
+	            }
+	            PTable table = tableRef.getTable();
+	            // Track tables to which we've sent uncommitted data
+	            if (table.isTransactional()) {
+	                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+	            }
+	            table.getIndexMaintainers(indexMetaDataPtr, connection);
+	            boolean isDataTable = true;
+	            // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
+	            long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
+	            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
+	            while (mutationsIterator.hasNext()) {
+	                Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
+	                byte[] htableName = pair.getFirst();
+	                List<Mutation> mutationList = pair.getSecond();
+	                
+	                //create a span per target table
+	                //TODO maybe we can be smarter about the table name to string here?
+	                Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
+	
+	                int retryCount = 0;
+	                boolean shouldRetry = false;
+	                do {
+	                    ServerCache cache = null;
+	                    if (isDataTable) {
+	                        cache = setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr);
+	                    }
+	                
+	                    // If we haven't retried yet, retry for this case only, as it's possible that
+	                    // a split will occur after we send the index metadata cache to all known
+	                    // region servers.
+	                    shouldRetry = cache != null;
+	                    SQLException sqlE = null;
+	                    HTableInterface hTable = connection.getQueryServices().getTable(htableName);
+	                    try {
+	                        if (table.isTransactional()) {
+	                            // If we have indexes, wrap the HTable in a delegate HTable that
+	                            // will attach the necessary index meta data in the event of a
+	                            // rollback
+	                            if (!table.getIndexes().isEmpty()) {
+	                                hTable = new MetaDataAwareHTable(hTable, tableRef);
+	                            }
+	                            TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table);
+	                            // Don't add immutable indexes (those are the only ones that would participate
+	                            // during a commit), as we don't need conflict detection for these.
+	                            if (isDataTable) {
+	                                // Even for immutable, we need to do this so that an abort has the state
+	                                // necessary to generate the rows to delete.
+	                                addTransactionParticipant(txnAware);
+	                            } else {
+	                                txnAware.startTx(getTransaction());
+	                            }
+	                            hTable = txnAware;
+	                        }
+	                        long numMutations = mutationList.size();
                             GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                             
                             long startTime = System.currentTimeMillis();
-                            child.addTimelineAnnotation("Attempt " + retryCount);
-                            hTable.batch(mutations);
-                            child.stop();
+                            child.addTimelineAnnotation("Attempt " + retryCount);;
+	                        hTable.batch(mutationList);
+	                        child.stop();
+	                        child.stop();
                             shouldRetry = false;
                             long mutationCommitTime = System.currentTimeMillis() - startTime;
                             GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                             
-                            long mutationSizeBytes = calculateMutationSize(mutations);
+                            long mutationSizeBytes = calculateMutationSize(mutationList);
                             MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
                             mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
-                        } catch (Exception e) {
-                            SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
-                            if (inferredE != null) {
-                                if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
-                                    // Swallow this exception once, as it's possible that we split after sending the index metadata
-                                    // and one of the region servers doesn't have it. This will cause it to have it the next go around.
-                                    // If it fails again, we don't retry.
-                                    String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE;
-                                    logger.warn(LogUtil.addCustomAnnotations(msg, connection));
-                                    connection.getQueryServices().clearTableRegionCache(htableName);
+	                    } catch (Exception e) {
+	                        SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
+	                        if (inferredE != null) {
+	                            if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+	                                // Swallow this exception once, as it's possible that we split after sending the index metadata
+	                                // and one of the region servers doesn't have it. This will cause it to have it the next go around.
+	                                // If it fails again, we don't retry.
+	                                String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE;
+	                                logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+	                                connection.getQueryServices().clearTableRegionCache(htableName);
+	
+	                                // add a new child span as this one failed
+	                                child.addTimelineAnnotation(msg);
+	                                child.stop();
+	                                child = Tracing.child(span,"Failed batch, attempting retry");
+	
+	                                continue;
+	                            }
+	                            e = inferredE;
+	                        }
+	                        // Throw to client with both what was committed so far and what is left to be committed.
+	                        // That way, client can either undo what was done or try again with what was not done.
+	                        sqlE = new CommitException(e, getUncommittedStatementIndexes());
+	                    } finally {
+	                        try {
+	                            if (cache != null) {
+	                                cache.close();
+	                            }
+	                        } finally {
+	                            try {
+	                                hTable.close();
+	                            } 
+	                            catch (IOException e) {
+	                                if (sqlE != null) {
+	                                    sqlE.setNextException(ServerUtil.parseServerException(e));
+	                                } else {
+	                                    sqlE = ServerUtil.parseServerException(e);
+	                                }
+	                            } 
+	                            if (sqlE != null) {
+	                            	// clear pending mutations
+	                            	mutations.clear();
+	                                throw sqlE;
+	                            }
+	                        }
+	                    }
+	                } while (shouldRetry && retryCount++ < 1);
+	                isDataTable = false;
+	            }
+	            if (tableRef.getTable().getType() != PTableType.INDEX) {
+	                numRows -= valuesMap.size();
+	            }
+	            // Remove batches as we process them
+	            if (sendAll) {
+	            	tableRefIterator.remove(); // Iterating through actual map in this case
+	            } else {
+	            	mutations.remove(tableRef);
+	            }
+	        }
+        }
+        // Note that we cannot assume that *all* mutations have been sent, since we've optimized this
+        // now to only send the mutations for the tables we're querying, hence we've removed the
+        // assertions that we're here before.
+    }
 
-                                    // add a new child span as this one failed
-                                    child.addTimelineAnnotation(msg);
-                                    child.stop();
-                                    child = Tracing.child(span,"Failed batch, attempting retry");
+    public byte[] encodeTransaction() throws SQLException {
+        try {
+            return CODEC.encode(getTransaction());
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
+    }
+    
+    public static Transaction decodeTransaction(byte[] txnBytes) throws IOException {
+    	return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes);
+    }
 
-                                    continue;
-                                }
-                                e = inferredE;
-                            }
-                            sqlE = new CommitException(e, getUncommittedStatementIndexes());
-                        } finally {
-                            try {
-                                hTable.close();
-                            } catch (IOException e) {
-                                if (sqlE != null) {
-                                    sqlE.setNextException(ServerUtil.parseServerException(e));
-                                } else {
-                                    sqlE = ServerUtil.parseServerException(e);
-                                }
-                            } finally {
-                                try {
-                                    if (cache != null) {
-                                        cache.close();
-                                    }
-                                } finally {
-                                    if (sqlE != null) {
-                                        throw sqlE;
-                                    }
-                                }
-                            }
-                        }
-                    } while (shouldRetry && retryCount++ < 1);
-                    isDataTable = false;
-                }
-                if (tableRef.getTable().getType() != PTableType.INDEX) {
-                    numRows -= entry.getValue().size();
+    private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations,
+            ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
+        PTable table = tableRef.getTable();
+        byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
+        ServerCache cache = null;
+        byte[] attribValue = null;
+        byte[] uuidValue = null;
+        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
+        if (table.isTransactional()) {
+            txState = encodeTransaction();
+        }
+        boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+        if (hasIndexMetaData) {
+            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) {
+                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+                cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
+                uuidValue = cache.getId();
+            } else {
+                attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+                uuidValue = ServerCacheClient.generateId();
+            }
+        } else if (txState.length == 0) {
+            return null;
+        }
+        // Either set the UUID to be able to access the index metadata from the cache
+        // or set the index metadata directly on the Mutation
+        for (Mutation mutation : mutations) {
+            if (tenantId != null) {
+                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            }
+            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+            if (attribValue != null) {
+                mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                if (txState.length > 0) {
+                    mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                 }
-                iterator.remove(); // Remove batches as we process them
+            } else if (!hasIndexMetaData && txState.length > 0) {
+                mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
             }
         }
-        assert(numRows==0);
-        assert(this.mutations.isEmpty());
+        return cache;
     }
     
-    public void rollback(PhoenixConnection connection) throws SQLException {
+    private void clear() throws SQLException {
         this.mutations.clear();
         numRows = 0;
     }
@@ -572,6 +947,100 @@ public class MutationState implements SQLCloseable {
     @Override
     public void close() throws SQLException {
     }
+
+    private void reset() {
+        txStarted = false;
+        tx = null;
+        uncommittedPhysicalNames.clear();
+    }
+    
+    public void rollback() throws SQLException {
+        clear();
+        txAwares.clear();
+        if (txContext != null) {
+            try {
+                if (txStarted) {
+                    txContext.abort();
+                }
+            } catch (TransactionFailureException e) {
+                throw new SQLException(e); // TODO: error code
+            } finally {
+            	reset();
+            }
+        }
+    }
+    
+    public void commit() throws SQLException {
+    	boolean sendMutationsFailed=false;
+        try {
+            send();
+        } catch (Throwable t) {
+        	sendMutationsFailed=true;
+        	throw t;
+        } finally {
+            txAwares.clear();
+            if (txContext != null) {
+                try {
+                    if (txStarted && !sendMutationsFailed) {
+                        txContext.finish();
+                    }
+                } catch (TransactionFailureException e) {
+                    try {
+                        txContext.abort(e);
+                        // abort and throw the original commit failure exception
+                        throw TransactionUtil.getTransactionFailureException(e);
+                    } catch (TransactionFailureException e1) {
+                        // if abort fails and throw the abort failure exception
+                        throw TransactionUtil.getTransactionFailureException(e1);
+                    }
+                } finally {
+                  	if (!sendMutationsFailed) {
+                  		reset();
+                  	}
+                  }
+            }
+        }
+    }
+
+    /**
+     * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a
+     * query. In this way, they are visible to subsequent reads but are not actually committed until
+     * commit is called.
+     * @param tableRefs
+     * @return true if at least partially transactional and false otherwise.
+     * @throws SQLException
+     */
+    public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException {
+        Transaction currentTx = getTransaction();
+        if (currentTx != null) {
+            // Initialize visibility so that transactions see their own writes.
+            // The checkpoint() method will set it to not see writes if necessary.
+            currentTx.setVisibility(VisibilityLevel.SNAPSHOT);
+        }
+        Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){
+            @Override
+            public boolean apply(TableRef tableRef) {
+                return tableRef.getTable().isTransactional();
+            }
+        });
+        if (filteredTableRefs.hasNext()) {
+            // FIXME: strip table alias to prevent equality check from failing due to alias mismatch on null alias.
+            // We really should be keying the tables based on the physical table name.
+            List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutations.keySet().size());
+            while (filteredTableRefs.hasNext()) {
+                TableRef tableRef = filteredTableRefs.next();
+                strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols()));
+            }
+            startTransaction();
+            send(strippedAliases.iterator());
+            return true;
+        }
+        return false;
+    }
+        
+    public void send() throws SQLException {
+        send(null);
+    }
     
     public static int[] joinSortedIntArrays(int[] a, int[] b) {
         int[] result = new int[a.length + b.length];

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a70c701/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 3098980..19b3e6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -151,7 +151,7 @@ public class ScanPlan extends BaseQueryPlan {
             return spoolingResultIteratorFactory;
         } else {
             return new ChunkedResultIterator.ChunkedResultIteratorFactory(
-                    spoolingResultIteratorFactory, table);
+                    spoolingResultIteratorFactory, context.getConnection().getMutationState(), table);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a70c701/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index dea82a8..ab7534a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -89,6 +89,8 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
+import co.cask.tephra.TxConstants;
+
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -874,7 +876,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             }
         	else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()
         			// Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor
-        			) {
+        			|| (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
         	    nDeleteCF++;
         	}
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a70c701/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index 70ddc86..05a01b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ScanRanges;
@@ -59,7 +58,7 @@ public class IndexMetaDataCacheClient {
      * @param mutations the list of mutations that will be sent in a batch to server
      * @param indexMetaDataByteLength length in bytes of the index metadata cache
      */
-    public static boolean useIndexMetadataCache(PhoenixConnection connection, List<Mutation> mutations, int indexMetaDataByteLength) {
+    public static boolean useIndexMetadataCache(PhoenixConnection connection, List<? extends Mutation> mutations, int indexMetaDataByteLength) {
         ReadOnlyProps props = connection.getQueryServices().getProps();
         int threshold = props.getInt(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD);
         return (indexMetaDataByteLength > ServerCacheClient.UUID_LENGTH && mutations.size() > threshold);
@@ -72,25 +71,26 @@ public class IndexMetaDataCacheClient {
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr) throws SQLException {
+    public ServerCache addIndexMetadataCache(List<? extends Mutation> mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
-        return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+        return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
     }
     
     
     /**
      * Send the index metadata cahce to all region servers for regions that will handle the mutations.
+     * @param txState TODO
      * @return client-side {@link ServerCache} representing the added index metadata cache
      * @throws SQLException 
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr) throws SQLException {
+    public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
-        return serverCache.addServerCache(ranges, ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+        return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a70c701/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 488db44..7ced2f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -24,11 +24,15 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
+import co.cask.tephra.Transaction;
+
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.TransactionUtil;
 
 public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     public IndexMetaDataCacheFactory() {
@@ -43,10 +47,16 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache (ImmutableBytesWritable cachePtr, final MemoryChunk chunk) throws SQLException {
+    public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException {
         // just use the standard keyvalue builder - this doesn't really need to be fast
-        final List<IndexMaintainer> maintainers =
+        final List<IndexMaintainer> maintainers = 
                 IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE);
+        final Transaction txn;
+        try {
+            txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null;
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
         return new IndexMetaDataCache() {
 
             @Override
@@ -58,6 +68,11 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
             public List<IndexMaintainer> getIndexMaintainers() {
                 return maintainers;
             }
+
+            @Override
+            public Transaction getTransaction() {
+                return txn;
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a70c701/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 2fd168a..806a20a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -75,7 +75,7 @@ import com.google.common.collect.Multimap;
  * 
  * @since 2.1
  */
-public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
+public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
     private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class);
     private RegionCoprocessorEnvironment env;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a70c701/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 3dcc44e..60ae915 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -22,6 +22,8 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 
+import co.cask.tephra.Transaction;
+
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.GlobalCache;
@@ -46,8 +48,10 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID);
         if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
         byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+        byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
         if (md != null) {
             final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
+            final Transaction txn = MutationState.decodeTransaction(txState);
             return new IndexMetaDataCache() {
 
                 @Override
@@ -58,6 +62,11 @@ public class PhoenixIndexMetaData implements IndexMetaData {
                     return indexMaintainers;
                 }
 
+                @Override
+                public Transaction getTransaction() {
+                    return txn;
+                }
+
             };
         } else {
             byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
@@ -80,6 +89,10 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         this.attributes = attributes;
     }
     
+    public Transaction getTransaction() {
+        return indexMetaDataCache.getTransaction();
+    }
+    
     public List<IndexMaintainer> getIndexMaintainers() {
         return indexMetaDataCache.getIndexMaintainers();
     }