You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/03/25 19:57:46 UTC

phoenix git commit: Push transaction state to server for secondary indexing

Repository: phoenix
Updated Branches:
  refs/heads/txn 0d44dd806 -> 826ebf5ce


Push transaction state to server for secondary indexing


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/826ebf5c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/826ebf5c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/826ebf5c

Branch: refs/heads/txn
Commit: 826ebf5ce741342ed1c758e594ca01bc9cb8e036
Parents: 0d44dd8
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Mar 25 11:54:44 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Mar 25 11:55:11 2015 -0700

----------------------------------------------------------------------
 .../phoenix/cache/IndexMetaDataCache.java       |  22 ++++
 .../apache/phoenix/cache/ServerCacheClient.java |   3 +-
 .../org/apache/phoenix/cache/TenantCache.java   |   2 +-
 .../apache/phoenix/cache/TenantCacheImpl.java   |  13 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |   8 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   8 +-
 .../coprocessor/BaseScannerRegionObserver.java  |  14 ++-
 .../phoenix/coprocessor/ScanRegionObserver.java |  11 +-
 .../coprocessor/ServerCachingEndpointImpl.java  |   8 +-
 .../coprocessor/ServerCachingProtocol.java      |   5 +-
 .../generated/ServerCachingProtos.java          | 120 +++++++++++++++++--
 .../apache/phoenix/execute/BaseQueryPlan.java   |  17 ++-
 .../apache/phoenix/execute/MutationState.java   |   5 +-
 .../hbase/index/builder/IndexBuilder.java       |   9 ++
 .../hbase/index/scanner/ScannerBuilder.java     |   5 +-
 .../phoenix/index/IndexMetaDataCacheClient.java |  10 +-
 .../index/IndexMetaDataCacheFactory.java        |  18 ++-
 .../phoenix/index/PhoenixIndexBuilder.java      |   2 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java |  53 ++++++--
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   4 +
 .../apache/phoenix/join/HashCacheClient.java    |   3 +-
 .../apache/phoenix/join/HashCacheFactory.java   |   2 +-
 .../apache/phoenix/util/TransactionUtil.java    |   8 +-
 .../src/main/ServerCachingService.proto         |   1 +
 24 files changed, 278 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index 48949f1..359e08f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -19,10 +19,32 @@
 package org.apache.phoenix.cache;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
+import co.cask.tephra.Transaction;
+
 import org.apache.phoenix.index.IndexMaintainer;
 
 public interface IndexMetaDataCache extends Closeable {
+    public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() {
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public List<IndexMaintainer> getIndexMaintainers() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public Transaction getTransaction() {
+            return null;
+        }
+        
+    };
     public List<IndexMaintainer> getIndexMaintainers();
+    public Transaction getTransaction();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 1233e1c..76bedbc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -142,7 +142,7 @@ public class ServerCacheClient {
 
     }
     
-    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
         ConnectionQueryServices services = connection.getQueryServices();
         MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
         List<Closeable> closeables = new ArrayList<Closeable>();
@@ -201,6 +201,7 @@ public class ServerCacheClient {
                                                     ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
                                                     svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
                                                     builder.setCacheFactory(svrCacheFactoryBuider.build());
+                                                    builder.setTxState(HBaseZeroCopyByteString.wrap(txState));
                                                     instance.addServerCache(controller, builder.build(), rpcCallback);
                                                     if(controller.getFailedOn() != null) {
                                                         throw controller.getFailedOn();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index b968a9b..c7cd58f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -37,6 +37,6 @@ import org.apache.phoenix.memory.MemoryManager;
 public interface TenantCache {
     MemoryManager getMemoryManager();
     Closeable getServerCache(ImmutableBytesPtr cacheId);
-    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
     void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 9005fa8..6ef7a6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -23,14 +23,17 @@ import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import com.google.common.cache.*;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.util.Closeables;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
 /**
  * 
  * Cache per tenant on server side.  Tracks memory usage for each
@@ -80,11 +83,11 @@ public class TenantCacheImpl implements TenantCache {
     }
     
     @Override
-    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
-        MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength());
+    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException {
+        MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
         boolean success = false;
         try {
-            Closeable element = cacheFactory.newCache(cachePtr, chunk);
+            Closeable element = cacheFactory.newCache(cachePtr, txState, chunk);
             getServerCaches().put(cacheId, element);
             success = true;
             return element;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 273024c..a0369d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -75,8 +75,10 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -490,12 +492,14 @@ public class DeleteCompiler {
                     public MutationState execute() throws SQLException {
                         // TODO: share this block of code with UPSERT SELECT
                         ImmutableBytesWritable ptr = context.getTempPtr();
-                        tableRef.getTable().getIndexMaintainers(ptr, context.getConnection());
+                        PTable table = tableRef.getTable();
+                        table.getIndexMaintainers(ptr, context.getConnection());
+                        byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
                         ServerCache cache = null;
                         try {
                             if (ptr.getLength() > 0) {
                                 IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                                cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                                cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
                                 byte[] uuidValue = cache.getId();
                                 context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 5fd1602..e72b634 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -89,6 +89,7 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -607,12 +608,15 @@ public class UpsertCompiler {
                         @Override
                         public MutationState execute() throws SQLException {
                             ImmutableBytesWritable ptr = context.getTempPtr();
-                            tableRef.getTable().getIndexMaintainers(ptr, context.getConnection());
+                            PTable table = tableRef.getTable();
+                            table.getIndexMaintainers(ptr, context.getConnection());
+                            byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+
                             ServerCache cache = null;
                             try {
                                 if (ptr.getLength() > 0) {
                                     IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                                    cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                                    cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
                                     byte[] uuidValue = cache.getId();
                                     scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 25ac408..c4eb4f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
+import co.cask.tephra.Transaction;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -85,6 +87,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
     public static final String REVERSE_SCAN = "_ReverseScan";
     public static final String ANALYZE_TABLE = "_ANALYZETABLE";
+    public static final String TX_STATE = "_TxState";
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
      * are used to augment log lines emitted by Phoenix. See https://issues.apache.org/jira/browse/PHOENIX-1198.
@@ -222,7 +225,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             final byte[][] viewConstants, final TupleProjector projector,
             final ImmutableBytesWritable ptr) {
         return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
-                dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr);
+                dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr);
     }
     
     /**
@@ -230,13 +233,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
      * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
      * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
      * the same from a custom filter.
-     * @param arrayFuncRefs
      * @param arrayKVRefs
+     * @param arrayFuncRefs
      * @param offset starting position in the rowkey.
      * @param scan
      * @param tupleProjector
      * @param dataRegion
      * @param indexMaintainer
+     * @param tx TODO
      * @param viewConstants
      */
     protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -244,9 +248,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             final Expression[] arrayFuncRefs, final int offset, final Scan scan,
             final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
             final HRegion dataRegion, final IndexMaintainer indexMaintainer,
-            final byte[][] viewConstants, final KeyValueSchema kvSchema, 
-            final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
-            final ImmutableBytesWritable ptr) {
+            Transaction tx, final byte[][] viewConstants, 
+            final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet,
+            final TupleProjector projector, final ImmutableBytesWritable ptr) {
         return new RegionScanner() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ddde407..04275b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -26,7 +26,7 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
+import co.cask.tephra.Transaction;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -58,8 +58,10 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 
 /**
@@ -190,6 +192,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         HRegion dataRegion = null;
         IndexMaintainer indexMaintainer = null;
         byte[][] viewConstants = null;
+        Transaction tx = null;
         ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
         if (dataColumns != null) {
             tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
@@ -198,14 +201,16 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
             indexMaintainer = indexMaintainers.get(0);
             viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+            byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+            tx = TransactionUtil.decodeTxnState(txState);
         }
         
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         innerScanner =
                 getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan,
-                    dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants,
-                    kvSchema, kvSchemaBitSet, j == null ? p : null, ptr);
+                    dataColumns, tupleProjector, dataRegion, indexMaintainer, tx,
+                    viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr);
 
         final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
         if (j != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 2f31b08..9f3bdb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -26,18 +26,17 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheResponse;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.util.ByteUtil;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
@@ -66,6 +65,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
     ImmutableBytesWritable cachePtr =
         org.apache.phoenix.protobuf.ProtobufUtil
             .toImmutableBytesWritable(request.getCachePtr());
+    byte[] txState = request.hasTxState() ? request.getTxState().toByteArray() : ByteUtil.EMPTY_BYTE_ARRAY;
 
     try {
       @SuppressWarnings("unchecked")
@@ -73,7 +73,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
           (Class<ServerCacheFactory>) Class.forName(request.getCacheFactory().getClassName());
       ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
       tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()),
-        cachePtr, cacheFactory);
+        cachePtr, txState, cacheFactory);
     } catch (Throwable e) {
       ProtobufUtil.setControllerException(controller, new IOException(e));
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
index 4fdfe99..b201c8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -36,18 +36,19 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
  */
 public interface ServerCachingProtocol {
     public static interface ServerCacheFactory extends Writable {
-        public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException;
+        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException;
     }
     /**
      * Add the cache to the region server cache.  
      * @param tenantId the tenantId or null if not applicable
      * @param cacheId unique identifier of the cache
      * @param cachePtr pointer to the byte array of the cache
+     * @param txState TODO
      * @param cacheFactory factory that converts from byte array to object representation on the server side
      * @return true on success and otherwise throws
      * @throws SQLException 
      */
-    public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+    public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
     /**
      * Remove the cache from the region server cache.  Called upon completion of
      * the operation when cache is no longer needed.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index 69db21b..5ee1dfb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -699,6 +699,16 @@ public final class ServerCachingProtos {
      * <code>required .ServerCacheFactory cacheFactory = 4;</code>
      */
     org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactoryOrBuilder getCacheFactoryOrBuilder();
+
+    // optional bytes txState = 5;
+    /**
+     * <code>optional bytes txState = 5;</code>
+     */
+    boolean hasTxState();
+    /**
+     * <code>optional bytes txState = 5;</code>
+     */
+    com.google.protobuf.ByteString getTxState();
   }
   /**
    * Protobuf type {@code AddServerCacheRequest}
@@ -787,6 +797,11 @@ public final class ServerCachingProtos {
               bitField0_ |= 0x00000008;
               break;
             }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              txState_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -903,11 +918,28 @@ public final class ServerCachingProtos {
       return cacheFactory_;
     }
 
+    // optional bytes txState = 5;
+    public static final int TXSTATE_FIELD_NUMBER = 5;
+    private com.google.protobuf.ByteString txState_;
+    /**
+     * <code>optional bytes txState = 5;</code>
+     */
+    public boolean hasTxState() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes txState = 5;</code>
+     */
+    public com.google.protobuf.ByteString getTxState() {
+      return txState_;
+    }
+
     private void initFields() {
       tenantId_ = com.google.protobuf.ByteString.EMPTY;
       cacheId_ = com.google.protobuf.ByteString.EMPTY;
       cachePtr_ = org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ImmutableBytesWritable.getDefaultInstance();
       cacheFactory_ = org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactory.getDefaultInstance();
+      txState_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -953,6 +985,9 @@ public final class ServerCachingProtos {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeMessage(4, cacheFactory_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, txState_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -978,6 +1013,10 @@ public final class ServerCachingProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(4, cacheFactory_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, txState_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1021,6 +1060,11 @@ public final class ServerCachingProtos {
         result = result && getCacheFactory()
             .equals(other.getCacheFactory());
       }
+      result = result && (hasTxState() == other.hasTxState());
+      if (hasTxState()) {
+        result = result && getTxState()
+            .equals(other.getTxState());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1050,6 +1094,10 @@ public final class ServerCachingProtos {
         hash = (37 * hash) + CACHEFACTORY_FIELD_NUMBER;
         hash = (53 * hash) + getCacheFactory().hashCode();
       }
+      if (hasTxState()) {
+        hash = (37 * hash) + TXSTATE_FIELD_NUMBER;
+        hash = (53 * hash) + getTxState().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1177,6 +1225,8 @@ public final class ServerCachingProtos {
           cacheFactoryBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000008);
+        txState_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -1229,6 +1279,10 @@ public final class ServerCachingProtos {
         } else {
           result.cacheFactory_ = cacheFactoryBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.txState_ = txState_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1257,6 +1311,9 @@ public final class ServerCachingProtos {
         if (other.hasCacheFactory()) {
           mergeCacheFactory(other.getCacheFactory());
         }
+        if (other.hasTxState()) {
+          setTxState(other.getTxState());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1610,6 +1667,42 @@ public final class ServerCachingProtos {
         return cacheFactoryBuilder_;
       }
 
+      // optional bytes txState = 5;
+      private com.google.protobuf.ByteString txState_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes txState = 5;</code>
+       */
+      public boolean hasTxState() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional bytes txState = 5;</code>
+       */
+      public com.google.protobuf.ByteString getTxState() {
+        return txState_;
+      }
+      /**
+       * <code>optional bytes txState = 5;</code>
+       */
+      public Builder setTxState(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        txState_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes txState = 5;</code>
+       */
+      public Builder clearTxState() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        txState_ = getDefaultInstance().getTxState();
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:AddServerCacheRequest)
     }
 
@@ -3383,20 +3476,21 @@ public final class ServerCachingProtos {
       "\n\032ServerCachingService.proto\032\030ServerCach" +
       "eFactory.proto\"K\n\026ImmutableBytesWritable" +
       "\022\021\n\tbyteArray\030\001 \002(\014\022\016\n\006offset\030\002 \002(\005\022\016\n\006l" +
-      "ength\030\003 \002(\005\"\220\001\n\025AddServerCacheRequest\022\020\n" +
+      "ength\030\003 \002(\005\"\241\001\n\025AddServerCacheRequest\022\020\n" +
       "\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\022)\n\010cach" +
       "ePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022)\n\014c" +
-      "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\"(" +
-      "\n\026AddServerCacheResponse\022\016\n\006return\030\001 \002(\010" +
-      "\"=\n\030RemoveServerCacheRequest\022\020\n\010tenantId" +
-      "\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerC",
-      "acheResponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerC" +
-      "achingService\022A\n\016addServerCache\022\026.AddSer" +
-      "verCacheRequest\032\027.AddServerCacheResponse" +
-      "\022J\n\021removeServerCache\022\031.RemoveServerCach" +
-      "eRequest\032\032.RemoveServerCacheResponseBG\n(" +
-      "org.apache.phoenix.coprocessor.generated" +
-      "B\023ServerCachingProtosH\001\210\001\001\240\001\001"
+      "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\022\017" +
+      "\n\007txState\030\005 \001(\014\"(\n\026AddServerCacheRespons" +
+      "e\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveServerCacheRe" +
+      "quest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014",
+      "\"+\n\031RemoveServerCacheResponse\022\016\n\006return\030" +
+      "\001 \002(\0102\245\001\n\024ServerCachingService\022A\n\016addSer" +
+      "verCache\022\026.AddServerCacheRequest\032\027.AddSe" +
+      "rverCacheResponse\022J\n\021removeServerCache\022\031" +
+      ".RemoveServerCacheRequest\032\032.RemoveServer" +
+      "CacheResponseBG\n(org.apache.phoenix.copr" +
+      "ocessor.generatedB\023ServerCachingProtosH\001" +
+      "\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3414,7 +3508,7 @@ public final class ServerCachingProtos {
           internal_static_AddServerCacheRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_AddServerCacheRequest_descriptor,
-              new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", });
+              new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", });
           internal_static_AddServerCacheResponse_descriptor =
             getDescriptor().getMessageTypes().get(2);
           internal_static_AddServerCacheResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 94233c8..d96e339 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -66,6 +66,7 @@ import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.cloudera.htrace.TraceScope;
 
 import com.google.common.collect.Lists;
@@ -205,13 +206,15 @@ public abstract class BaseQueryPlan implements QueryPlan {
                 KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns);
                 // Set key value schema of the data columns.
                 serializeSchemaIntoScan(scan, schema);
-                String parentSchema = context.getCurrentTable().getTable().getParentSchemaName().getString();
-                String parentTable = context.getCurrentTable().getTable().getParentTableName().getString();
+                PTable parentTable = context.getCurrentTable().getTable();
+                String parentSchemaName = parentTable.getParentSchemaName().getString();
+                String parentTableName = parentTable.getParentTableName().getString();
                 final ParseNodeFactory FACTORY = new ParseNodeFactory();
+                // TODO: is it necessary to re-resolve the table?
                 TableRef dataTableRef =
                         FromCompiler.getResolver(
-                            FACTORY.namedTable(null, TableName.create(parentSchema, parentTable)),
-                            context.getConnection()).resolveTable(parentSchema, parentTable);
+                            FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)),
+                            context.getConnection()).resolveTable(parentSchemaName, parentTableName);
                 PTable dataTable = dataTableRef.getTable();
                 // Set index maintainer of the local index.
                 serializeIndexMaintainerIntoScan(scan, dataTable);
@@ -248,7 +251,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         return (scope.getSpan() != null) ? new TracingIterator(scope, iterator) : iterator;
     }
 
-    private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) {
+    private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) throws SQLException {
         PName name = context.getCurrentTable().getTable().getName();
         List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
         for (PTable index : dataTable.getIndexes()) {
@@ -260,6 +263,10 @@ public abstract class BaseQueryPlan implements QueryPlan {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         IndexMaintainer.serialize(dataTable, ptr, indexes, context.getConnection());
         scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+        if (dataTable.isTransactional()) {
+            PhoenixConnection conn = context.getConnection();
+            scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getTransactionContext().getCurrentTransaction()));
+        }
     }
 
     private void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index edaab9e..8915534 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -394,9 +394,10 @@ public class MutationState implements SQLCloseable {
                     if (hasIndexMaintainers && isDataTable) {
                         byte[] attribValue = null;
                         byte[] uuidValue;
-                        if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
+                        byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY;
+                        if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength() + txState.length)) {
                             IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                            cache = client.addIndexMetadataCache(mutations, tempPtr);
+                            cache = client.addIndexMetadataCache(mutations, tempPtr, txState);
                             child.addTimelineAnnotation("Updated index metadata cache");
                             uuidValue = cache.getId();
                             // If we haven't retried yet, retry for this case only, as it's possible that

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 1c9782e..b91a52a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -68,6 +68,15 @@ public interface IndexBuilder extends Stoppable {
    * @return a Map of the mutations to make -> target index table name
    * @throws IOException on failure
    */
+  /* TODO:
+  Create BaseIndexBuilder with everything except getIndexUpdate(). 
+  Derive two concrete classes: NonTxIndexBuilder and TxIndexBuilder.
+  NonTxIndexBuilder will be current impl of this method.
+  TxIndexBuilder will use a scan with skipScan over TxAwareHBase to find the latest values.
+  Conditionally don't do WALEdit stuff for txnal table (ensure Phoenix/HBase tolerates index WAl edit info not being there)
+  Noop Failure mode
+  */
+  
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 575779a..32e4d84 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.collect.Lists;
 import org.apache.phoenix.hbase.index.covered.KeyValueStore;
 import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter;
 import org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter;
@@ -41,6 +39,8 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
+import com.google.common.collect.Lists;
+
 /**
  *
  */
@@ -57,6 +57,7 @@ public class ScannerBuilder {
 
   public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
 
+    // TODO: This needs to use some form of the filter that Tephra has when transactional
     Filter columnFilters = getColumnFilters(indexedColumns);
     FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index 70ddc86..c1135bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -67,30 +67,32 @@ public class IndexMetaDataCacheClient {
     
     /**
      * Send the index metadata cahce to all region servers for regions that will handle the mutations.
+     * @param txState TODO
      * @return client-side {@link ServerCache} representing the added index metadata cache
      * @throws SQLException 
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr) throws SQLException {
+    public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
-        return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+        return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
     }
     
     
     /**
      * Send the index metadata cahce to all region servers for regions that will handle the mutations.
+     * @param txState TODO
      * @return client-side {@link ServerCache} representing the added index metadata cache
      * @throws SQLException 
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr) throws SQLException {
+    public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
-        return serverCache.addServerCache(ranges, ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+        return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 488db44..8b1ee18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -24,11 +24,14 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
+import co.cask.tephra.Transaction;
+
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.TransactionUtil;
 
 public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     public IndexMetaDataCacheFactory() {
@@ -43,10 +46,16 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache (ImmutableBytesWritable cachePtr, final MemoryChunk chunk) throws SQLException {
+    public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException {
         // just use the standard keyvalue builder - this doesn't really need to be fast
-        final List<IndexMaintainer> maintainers =
+        final List<IndexMaintainer> maintainers = 
                 IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE);
+        final Transaction txn;
+        try {
+            txn = TransactionUtil.decodeTxnState(txState);
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
         return new IndexMetaDataCache() {
 
             @Override
@@ -58,6 +67,11 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
             public List<IndexMaintainer> getIndexMaintainers() {
                 return maintainers;
             }
+
+            @Override
+            public Transaction getTransaction() {
+                return txn;
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index b89c807..eb117bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -58,7 +58,7 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
         for (int i = 0; i < miniBatchOp.size(); i++) {
             Mutation m = miniBatchOp.getOperation(i);
             keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
-            List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap());
+            List<IndexMaintainer> indexMaintainers = getCodec().getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
             
             for(IndexMaintainer indexMaintainer: indexMaintainers) {
                 if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 99e26d1..8b507b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -23,10 +23,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import co.cask.tephra.Transaction;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
@@ -34,6 +34,7 @@ import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -50,6 +51,7 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
 
@@ -78,18 +80,47 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
     }
 
-    List<IndexMaintainer> getIndexMaintainers(Map<String, byte[]> attributes) throws IOException{
+    boolean hasIndexMaintainers(Map<String, byte[]> attributes) {
         if (attributes == null) {
-            return Collections.emptyList();
+            return false;
         }
         byte[] uuid = attributes.get(INDEX_UUID);
         if (uuid == null) {
-            return Collections.emptyList();
+            return false;
+        }
+        return true;
+    }
+    
+    IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException{
+        if (attributes == null) {
+            return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
+        }
+        byte[] uuid = attributes.get(INDEX_UUID);
+        if (uuid == null) {
+            return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
         }
         byte[] md = attributes.get(INDEX_MD);
-        List<IndexMaintainer> indexMaintainers;
+        byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
         if (md != null) {
-            indexMaintainers = IndexMaintainer.deserialize(md);
+            final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
+            final Transaction txn = TransactionUtil.decodeTxnState(txState);
+            return new IndexMetaDataCache() {
+
+                @Override
+                public void close() throws IOException {
+                }
+
+                @Override
+                public List<IndexMaintainer> getIndexMaintainers() {
+                    return indexMaintainers;
+                }
+
+                @Override
+                public Transaction getTransaction() {
+                    return txn;
+                }
+                
+            };
         } else {
             byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
             ImmutableBytesWritable tenantId =
@@ -103,10 +134,9 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
                     .setMessage(msg).build().buildException();
                 ServerUtil.throwIOException("Index update failed", e); // will not return
             }
-            indexMaintainers = indexCache.getIndexMaintainers();
+            return indexCache;
         }
     
-        return indexMaintainers;
     }
     
     @Override
@@ -127,7 +157,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
      * @throws IOException
      */
     private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException {
-        List<IndexMaintainer> indexMaintainers = getIndexMaintainers(state.getUpdateAttributes());
+        IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes());
+        List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers();
         if (indexMaintainers.isEmpty()) {
             return Collections.emptyList();
         }
@@ -187,7 +218,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     
   @Override
   public boolean isEnabled(Mutation m) throws IOException {
-      return !getIndexMaintainers(m.getAttributesMap()).isEmpty();
+      return !hasIndexMaintainers(m.getAttributesMap());
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index dffd7c4..a646d96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -439,12 +439,16 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
 
     public void startTransaction() throws SQLException {
         if (txContext == null) {
+            boolean success = false;
             try {
                 TransactionSystemClient txServiceClient = this.getQueryServices().getTransactionSystemClient();
                 this.txContext = new TransactionContext(txServiceClient);
                 txContext.start();
+                success = true;
             } catch (TransactionFailureException e) {
                 throw new SQLException(e); // TODO: error code
+            } finally {
+                if (!success) endTransaction();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index f13b28e..e8b3389 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 import org.apache.phoenix.util.TupleUtil;
@@ -80,7 +81,7 @@ public class HashCacheClient  {
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
-        return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
+        return serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef);
     }
     
     private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 8cae51a..3072736 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -56,7 +56,7 @@ public class HashCacheFactory implements ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException {
+    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException {
         try {
             // This reads the uncompressed length from the front of the compressed input
             int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 34bba47..6058711 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -45,12 +45,8 @@ public class TransactionUtil {
         }
     }
     
-    public static Transaction decodeTxnState(byte[] txnBytes) throws SQLException {
-        try {
-            return codec.decode(txnBytes);
-        } catch (IOException e) {
-            throw new SQLException(e);
-        }
+    public static Transaction decodeTxnState(byte[] txnBytes) throws IOException {
+        return txnBytes == null ? null : codec.decode(txnBytes);
     }
 
     public static SQLException getSQLException(TransactionFailureException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-protocol/src/main/ServerCachingService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto
index 63aeef9..dcbaabd 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -37,6 +37,7 @@ message AddServerCacheRequest {
   required bytes cacheId  = 2;
   required ImmutableBytesWritable cachePtr = 3;
   required ServerCacheFactory cacheFactory = 4;
+  optional bytes txState = 5;
 }
 
 message AddServerCacheResponse {