You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/09 09:08:15 UTC

[1/2] phoenix git commit: PHOENIX-1830 Transactional mutable secondary indexes

Repository: phoenix
Updated Branches:
  refs/heads/txn 0ad5c5675 -> e1521ab05


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
new file mode 100644
index 0000000..dd71a58
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import co.cask.tephra.Transaction;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.hbase.index.MultiMutation;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.trace.TracingUtils;
+import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction
+ * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a
+ * bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios
+ * are handled by aborting the transaction.
+ */
+public class PhoenixTransactionalIndexer extends BaseRegionObserver {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class);
+
+    private PhoenixIndexCodec codec;
+    private IndexWriter writer;
+    private boolean stopped;
+
+    @Override
+    public void start(CoprocessorEnvironment e) throws IOException {
+        final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
+        String serverName = env.getRegionServerServices().getServerName().getServerName();
+        codec = new PhoenixIndexCodec();
+        codec.initialize(env);
+
+        // setup the actual index writer
+        this.writer = new IndexWriter(env, serverName + "-tx-index-writer");
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment e) throws IOException {
+        if (this.stopped) { return; }
+        this.stopped = true;
+        String msg = "TxIndexer is being stopped";
+        this.writer.stop(msg);
+    }
+
+    @Override
+    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+            MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+
+        Mutation m = miniBatchOp.getOperation(0);
+        if (!codec.isEnabled(m)) {
+            super.preBatchMutate(c, miniBatchOp);
+            return;
+        }
+
+        Collection<Pair<Mutation, byte[]>> indexUpdates = null;
+        // get the current span, or just use a null-span to avoid a bunch of if statements
+        try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
+
+            // get the index updates for all elements in this batch
+            indexUpdates = getIndexUpdates(c.getEnvironment(), miniBatchOp);
+
+            current.addTimelineAnnotation("Built index updates, doing preStep");
+            TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
+
+            // no index updates, so we are done
+            if (!indexUpdates.isEmpty()) {
+                this.writer.write(indexUpdates);
+            }
+        } catch (Throwable t) {
+            LOG.error("Failed to update index with entries:" + indexUpdates, t);
+            IndexManagementUtil.rethrowIndexingException(t);
+        }
+    }
+
+    private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        // Collect the set of mutable ColumnReferences so that we can first
+        // run a scan to get the current state. We'll need this to delete
+        // the existing index rows.
+        Map<String,byte[]> updateAttributes = miniBatchOp.getOperation(0).getAttributesMap();
+        PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(env,updateAttributes);
+        Transaction tx = indexMetaData.getTransaction();
+        assert(tx != null);
+        List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
+        Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
+        for (IndexMaintainer indexMaintainer : indexMaintainers) {
+            if (!indexMaintainer.isImmutableRows()) {
+                mutableColumns.addAll(indexMaintainer.getAllColumns());
+            }
+        }
+        ResultScanner scanner = null;
+        TransactionAwareHTable txTable = null;
+        
+        // Collect up all mutations in batch
+        Map<ImmutableBytesPtr, MultiMutation> mutations =
+                new HashMap<ImmutableBytesPtr, MultiMutation>();
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            // add the mutation to the batch set
+            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+            MultiMutation stored = mutations.get(row);
+            // we haven't seen this row before, so add it
+            if (stored == null) {
+                stored = new MultiMutation(row);
+                mutations.put(row, stored);
+            }
+            stored.addAll(m);
+        }
+        
+        try {
+            if (!mutableColumns.isEmpty()) {
+                List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
+                for (ImmutableBytesPtr ptr : mutations.keySet()) {
+                    keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
+                }
+                Scan scan = new Scan();
+                ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+                scanRanges.initializeScan(scan);
+                scan.setFilter(scanRanges.getSkipScanFilter());
+                TableName tableName = env.getRegion().getRegionInfo().getTable();
+                HTableInterface htable = env.getTable(tableName);
+                txTable = new TransactionAwareHTable(htable);
+                txTable.startTx(tx);
+                scanner = txTable.getScanner(scan);
+            }
+        } finally {
+            if (txTable != null) txTable.close();
+        }
+        
+        Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
+        if (scanner == null) {
+            for (Mutation m : mutations.values()) {
+                TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
+                state.applyMutation(m);
+                Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
+                for (IndexUpdate update : updates) {
+                    indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+                }
+            }
+        } else {
+            Result result;
+            while ((result = scanner.next()) != null) {
+                TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result);
+                Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
+                for (IndexUpdate delete : deletes) {
+                    indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+                }
+                Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow()));
+                state.applyMutation(m);
+                Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
+                for (IndexUpdate update : updates) {
+                    indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+                }
+            }
+        }
+        return indexUpdates;
+    }
+
+
+    private static class TxTableState implements TableState {
+        private final byte[] rowKey;
+        private final long currentTimestamp;
+        private final RegionCoprocessorEnvironment env;
+        private final Map<String, byte[]> attributes;
+        private final List<Cell> pendingUpdates;
+        private final Set<ColumnReference> indexedColumns;
+        private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
+        
+        private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, byte[] rowKey) {
+            this.env = env;
+            this.currentTimestamp = currentTimestamp;
+            this.indexedColumns = indexedColumns;
+            this.attributes = attributes;
+            this.rowKey = rowKey;
+            int estimatedSize = indexedColumns.size();
+            this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
+            this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
+        }
+        
+        public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m) {
+            this(env, indexedColumns, attributes, currentTimestamp, m.getRow());
+            applyMutation(m);
+        }
+        
+        public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Result r) {
+            this(env, indexedColumns, attributes, currentTimestamp, r.getRow());
+
+            for (ColumnReference ref : indexedColumns) {
+                Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+                if (cell != null) {
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                    valueMap.put(ref, ptr);
+                }
+            }
+        }
+        
+        @Override
+        public RegionCoprocessorEnvironment getEnvironment() {
+            return env;
+        }
+
+        @Override
+        public long getCurrentTimestamp() {
+            return currentTimestamp;
+        }
+
+        @Override
+        public Map<String, byte[]> getUpdateAttributes() {
+            return attributes;
+        }
+
+        @Override
+        public byte[] getCurrentRowKey() {
+            return rowKey;
+        }
+
+        @Override
+        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+            return Collections.emptyList();
+        }
+
+        public void applyMutation(Mutation m) {
+            if (m instanceof Delete) {
+                valueMap.clear();
+            } else {
+                CellScanner scanner = m.cellScanner();
+                try {
+                    while (scanner.advance()) {
+                        Cell cell = scanner.current();
+                        if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+                            ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+                            valueMap.remove(ref);
+                        } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
+                            for (ColumnReference ref : indexedColumns) {
+                                if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
+                                    valueMap.remove(ref);
+                                }
+                            }
+                        } else {
+                            ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+                            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                            ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                            valueMap.put(ref, ptr);
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e); // Impossible
+                }
+            }
+        }
+        
+        @Override
+        public Collection<Cell> getPendingUpdate() {
+            return pendingUpdates;
+        }
+
+        @Override
+        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+                throws IOException {
+            // TODO: creating these objects over and over again is wasteful
+            ColumnTracker tracker = new ColumnTracker(indexedColumns);
+            ValueGetter getter = new ValueGetter() {
+
+                @Override
+                public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
+                    return valueMap.get(ref);
+                }
+
+                @Override
+                public byte[] getRowKey() {
+                    return rowKey;
+                }
+                
+            };
+            Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker));
+            return pair;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
deleted file mode 100644
index c471df6..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
-import org.apache.phoenix.util.IndexUtil;
-
-public class PhoenixTxIndexBuilder extends TxIndexBuilder {
-    @Override
-    public void setup(RegionCoprocessorEnvironment env) throws IOException {
-        super.setup(env);
-        Configuration conf = env.getConfiguration();
-        // Install failure policy that just re-throws exception instead of killing RS
-        // or disabling the index
-        conf.set(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, PhoenixTxIndexFailurePolicy.class.getName());
-    }
-
-    @Override
-    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-        // The entire purpose of this method impl is to get the existing rows for the
-        // table rows being indexed into the block cache, as the index maintenance code
-        // does a point scan per row.
-        // TODO: provide a means for the transactional case to just return the Scanner
-        // for when this is executed as it seems like that would be more efficient.
-        IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
-    }
-
-    private PhoenixIndexCodec getCodec() {
-        return (PhoenixIndexCodec)this.codec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
deleted file mode 100644
index fa70cc9..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
-
-import com.google.common.collect.Multimap;
-
-public class PhoenixTxIndexFailurePolicy implements IndexFailurePolicy {
-    private Stoppable parent;
-
-    @Override
-    public void stop(String why) {
-        if (parent != null) {
-            parent.stop(why);
-        }
-    }
-
-    @Override
-    public boolean isStopped() {
-        return parent == null ? false : parent.isStopped();
-    }
-
-    @Override
-    public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
-        this.parent = parent;
-    }
-
-    @Override
-    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause)
-            throws IOException {
-        if (cause instanceof IOException) {
-            throw (IOException)cause;
-        } else if (cause instanceof RuntimeException) { throw (RuntimeException)cause; }
-        throw new IOException(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dae7939..20ab602 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -112,7 +112,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.index.PhoenixTxIndexBuilder;
+import org.apache.phoenix.index.PhoenixTransactionalIndexer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -705,11 +705,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             // all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318).
             if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW)
                     && !SchemaUtil.isMetaTable(tableName)
-                    && !SchemaUtil.isStatsTable(tableName)
-                    && !descriptor.hasCoprocessor(Indexer.class.getName())) {
-                Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
-                opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
-                Indexer.enableIndexing(descriptor, isTransactional ? PhoenixTxIndexBuilder.class : PhoenixIndexBuilder.class, opts, priority);
+                    && !SchemaUtil.isStatsTable(tableName)) {
+                if (isTransactional) {
+                    if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+                        descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null);
+                    }
+                } else {
+                    if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+                        Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+                        opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+                        Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+                    }
+                }
             }
             if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
                 descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index b317d77..8c2e990 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
@@ -238,7 +239,8 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
             // Install split policy to prevent a tenant's metadata from being split across regions.
-            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + 
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 
     public static final String CREATE_STATS_TABLE_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" +
@@ -259,7 +261,8 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_STAT_DATA_VERSIONS + ",\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
             // Install split policy to prevent a physical table's stats from being split across regions.
-            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + 
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 
     public static final String CREATE_SEQUENCE_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" +
@@ -277,5 +280,6 @@ public interface QueryConstants {
             LIMIT_REACHED_FLAG + " BOOLEAN \n" +
             " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
-            HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n";
+            HColumnDescriptor.KEEP_DELETED_CELLS + "="  + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" +
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index 265fc78..ee06179 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -17,12 +17,23 @@
  */
 package org.apache.phoenix.trace;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import org.apache.commons.configuration.Configuration;
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,20 +42,16 @@ import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.phoenix.metrics.*;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.Metrics;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.QueryUtil;
 
-import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.*;
-
-import static org.apache.phoenix.metrics.MetricInfo.*;
-import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 
 /**
  * Write the metrics to a phoenix table.
@@ -169,7 +176,9 @@ public class PhoenixMetricsSink implements MetricsSink {
                         TAG_COUNT + " smallint, " +
                         ANNOTATION_COUNT + " smallint" +
                         "  CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", "
-                        + PARENT.columnName + ", " + SPAN.columnName + "))\n";
+                        + PARENT.columnName + ", " + SPAN.columnName + "))\n" +
+                        PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+;
         PreparedStatement stmt = conn.prepareStatement(ddl);
         stmt.execute();
         this.table = table;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 1fcf16a..45729a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -24,8 +24,6 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -43,15 +41,12 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexStatementRewriter;
-import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -65,16 +60,13 @@ import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -644,58 +636,4 @@ public class IndexUtil {
         return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString())
                 : col.getExpressionStr();
     }
-    
-    /*
-     * The entire purpose of this method is to get the existing rows for the table rows being indexed into the
-     * block cache, as the index maintenance code does a point scan per row. Though for the transactional
-     * case we may be loading more than we need, since we're not applying the transaction filters, that
-     * should still be ok.
-     */
-    public static void loadMutatingRowsIntoBlockCache(HRegion region, PhoenixIndexCodec codec, MiniBatchOperationInProgress<Mutation> miniBatchOp, boolean useRawScan) 
-    throws IOException {
-        List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
-        Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
-                new HashMap<ImmutableBytesWritable, IndexMaintainer>();
-        ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
-        for (int i = 0; i < miniBatchOp.size(); i++) {
-            Mutation m = miniBatchOp.getOperation(i);
-            keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
-            List<IndexMaintainer> indexMaintainers = codec.getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
-            
-            for(IndexMaintainer indexMaintainer: indexMaintainers) {
-                if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
-                indexTableName.set(indexMaintainer.getIndexTableName());
-                if (maintainers.get(indexTableName) != null) continue;
-                maintainers.put(indexTableName, indexMaintainer);
-            }
-            
-        }
-        if (maintainers.isEmpty()) return;
-        Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
-        scan.setRaw(useRawScan);
-        ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-        scanRanges.initializeScan(scan);
-        scan.setFilter(scanRanges.getSkipScanFilter());
-        RegionScanner scanner = region.getScanner(scan);
-        // Run through the scanner using internal nextRaw method
-        region.startRegionOperation();
-        try {
-            synchronized (scanner) {
-                boolean hasMore;
-                do {
-                    List<Cell> results = Lists.newArrayList();
-                    // Results are potentially returned even when the return value of s.next is
-                    // false since this is an indication of whether or not there are more values
-                    // after the ones returned
-                    hasMore = scanner.nextRaw(results);
-                } while (hasMore);
-            }
-        } finally {
-            try {
-                scanner.close();
-            } finally {
-                region.closeRegionOperation();
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
index 7e73a81..0575b3f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
@@ -16,7 +16,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.index.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
 
 /**
  * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless of the current
@@ -41,12 +41,12 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec {
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
         return this.deletes;
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) {
         return this.updates;
     }
 
@@ -59,7 +59,4 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec {
     public boolean isEnabled(Mutation m) {
         return true;
     }
-
-    @Override
-    public void setContext(TableState state, Mutation mutation) throws IOException {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index f0c1483..e4654ea 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
@@ -180,14 +181,14 @@ public class TestCoveredColumnIndexCodec {
 
     // check the codec for deletes it should send
     LocalTableState state = new LocalTableState(env, table, p);
-    Iterable<IndexUpdate> updates = codec.getIndexDeletes(state);
+    Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
     assertFalse("Found index updates without any existing kvs in table!", updates.iterator().next()
         .isValid());
 
     // get the updates with the pending update
     state.setCurrentTimestamp(1);
     state.addPendingUpdates(kvs);
-    updates = codec.getIndexUpserts(state);
+    updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA);
     assertTrue("Didn't find index updates for pending primary table update!", updates.iterator()
         .hasNext());
     for (IndexUpdate update : updates) {
@@ -210,7 +211,7 @@ public class TestCoveredColumnIndexCodec {
     state = new LocalTableState(env, table, d);
     state.setCurrentTimestamp(2);
     // check the cleanup of the current table, after the puts (mocking a 'next' update)
-    updates = codec.getIndexDeletes(state);
+    updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
     for (IndexUpdate update : updates) {
       assertTrue("Didn't have any index cleanup, even though there is current state",
         update.isValid());
@@ -240,7 +241,7 @@ public class TestCoveredColumnIndexCodec {
     state.setCurrentTimestamp(d.getTimeStamp());
     // now we shouldn't see anything when getting the index update
     state.addPendingUpdates(d.getFamilyCellMap().get(FAMILY));
-    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
+    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA);
     for (IndexUpdate update : updates) {
       assertFalse("Had some index updates, though it should have been covered by the delete",
         update.isValid());


[2/2] phoenix git commit: PHOENIX-1830 Transactional mutable secondary indexes

Posted by ja...@apache.org.
PHOENIX-1830 Transactional mutable secondary indexes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e1521ab0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e1521ab0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e1521ab0

Branch: refs/heads/txn
Commit: e1521ab059b51a81738e5b9fd8390034dd1b9f1c
Parents: 0ad5c56
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Apr 9 00:08:11 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Apr 9 00:08:11 2015 -0700

----------------------------------------------------------------------
 .../end2end/index/TxGlobalMutableIndexIT.java   |  41 +++
 .../EndToEndCoveredColumnsIndexBuilderIT.java   |   8 +-
 .../covered/example/FailWithoutRetriesIT.java   |  12 +-
 .../org/apache/phoenix/hbase/index/Indexer.java |  54 ---
 .../phoenix/hbase/index/MultiMutation.java      |  86 +++++
 .../hbase/index/builder/BaseIndexBuilder.java   |  23 +-
 .../hbase/index/builder/BaseIndexCodec.java     |  47 +++
 .../hbase/index/builder/IndexBuildManager.java  |  18 +-
 .../hbase/index/builder/IndexBuilder.java       |  19 +-
 .../phoenix/hbase/index/covered/IndexCodec.java |  23 +-
 .../hbase/index/covered/IndexMetaData.java      |  22 ++
 .../hbase/index/covered/LocalTableState.java    |  10 +-
 .../hbase/index/covered/NonTxIndexBuilder.java  |  65 ++--
 .../phoenix/hbase/index/covered/TableState.java |   2 -
 .../hbase/index/covered/TxIndexBuilder.java     | 247 --------------
 .../example/CoveredColumnIndexCodec.java        |  10 +-
 .../covered/example/CoveredColumnIndexer.java   |   6 +-
 .../apache/phoenix/index/BaseIndexCodec.java    |  59 ----
 .../phoenix/index/PhoenixIndexBuilder.java      |  83 ++++-
 .../apache/phoenix/index/PhoenixIndexCodec.java | 160 ++++-----
 .../phoenix/index/PhoenixIndexMetaData.java     |  97 ++++++
 .../index/PhoenixTransactionalIndexer.java      | 339 +++++++++++++++++++
 .../phoenix/index/PhoenixTxIndexBuilder.java    |  53 ---
 .../index/PhoenixTxIndexFailurePolicy.java      |  50 ---
 .../query/ConnectionQueryServicesImpl.java      |  19 +-
 .../apache/phoenix/query/QueryConstants.java    |  10 +-
 .../phoenix/trace/PhoenixMetricsSink.java       |  41 ++-
 .../java/org/apache/phoenix/util/IndexUtil.java |  62 ----
 .../covered/CoveredIndexCodecForTesting.java    |   9 +-
 .../example/TestCoveredColumnIndexCodec.java    |   9 +-
 30 files changed, 909 insertions(+), 775 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
new file mode 100644
index 0000000..2d22b0f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import java.util.Map;
+
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+
+import com.google.common.collect.Maps;
+
+public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT {
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseMutableIndexIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
+        // Forces server cache to be used
+        props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index fa85f00..1cdd508 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -172,15 +172,15 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
     private Queue<TableStateVerifier> verifiers = new ArrayDeque<TableStateVerifier>();
 
     @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
       verify(state);
-      return super.getIndexDeletes(state);
+      return super.getIndexDeletes(state, context);
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) {
       verify(state);
-      return super.getIndexUpserts(state);
+      return super.getIndexUpserts(state, context);
     }
 
     private void verify(TableState state) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
index 281ad63..6367945 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -32,10 +31,11 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.IndexTestingUtils;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import org.apache.phoenix.index.BaseIndexCodec;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -63,18 +63,14 @@ public class FailWithoutRetriesIT {
     public static class FailingTestCodec extends BaseIndexCodec {
 
         @Override
-        public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
+        public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException {
             throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName());
         }
 
         @Override
-        public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
+        public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException {
             throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName());
         }
-
-        @Override
-        public void setContext(TableState state, Mutation mutation) throws IOException {}
-
     }
 
     @BeforeClass

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index a4fc96b..d79ffb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -25,12 +25,10 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -282,58 +280,6 @@ public class Indexer extends BaseRegionObserver {
       }
   }
 
-  private class MultiMutation extends Mutation {
-
-    private ImmutableBytesPtr rowKey;
-
-    public MultiMutation(ImmutableBytesPtr rowkey) {
-      this.rowKey = rowkey;
-    }
-
-    /**
-     * @param stored
-     */
-    public void addAll(Mutation stored) {
-      // add all the kvs
-      for (Entry<byte[], List<Cell>> kvs : stored.getFamilyCellMap().entrySet()) {
-        byte[] family = kvs.getKey();
-        List<Cell> list = getKeyValueList(family, kvs.getValue().size());
-        list.addAll(kvs.getValue());
-        familyMap.put(family, list);
-      }
-
-      // add all the attributes, not overriding already stored ones
-      for (Entry<String, byte[]> attrib : stored.getAttributesMap().entrySet()) {
-        if (this.getAttribute(attrib.getKey()) == null) {
-          this.setAttribute(attrib.getKey(), attrib.getValue());
-        }
-      }
-    }
-
-    private List<Cell> getKeyValueList(byte[] family, int hint) {
-      List<Cell> list = familyMap.get(family);
-      if (list == null) {
-        list = new ArrayList<Cell>(hint);
-      }
-      return list;
-    }
-
-    @Override
-    public byte[] getRow(){
-      return this.rowKey.copyBytesIfNecessary();
-    }
-
-    @Override
-    public int hashCode() {
-      return this.rowKey.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      return o == null ? false : o.hashCode() == this.hashCode();
-    }
-  }
-
   /**
    * Add the index updates to the WAL, or write to the index table, if the WAL has been disabled
    * @return <tt>true</tt> if the WAL has been updated.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java
new file mode 100644
index 0000000..f6381c4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+public class MultiMutation extends Mutation {
+
+    private ImmutableBytesPtr rowKey;
+
+    public MultiMutation(ImmutableBytesPtr rowkey) {
+      this.rowKey = rowkey;
+    }
+
+    /**
+     * @param stored
+     */
+    public void addAll(Mutation stored) {
+      // add all the kvs
+      for (Entry<byte[], List<Cell>> kvs : stored.getFamilyCellMap().entrySet()) {
+        byte[] family = kvs.getKey();
+        List<Cell> list = getKeyValueList(family, kvs.getValue().size());
+        list.addAll(kvs.getValue());
+        familyMap.put(family, list);
+      }
+
+      // add all the attributes, not overriding already stored ones
+      for (Entry<String, byte[]> attrib : stored.getAttributesMap().entrySet()) {
+        if (this.getAttribute(attrib.getKey()) == null) {
+          this.setAttribute(attrib.getKey(), attrib.getValue());
+        }
+      }
+    }
+
+    private List<Cell> getKeyValueList(byte[] family, int hint) {
+      List<Cell> list = familyMap.get(family);
+      if (list == null) {
+        list = new ArrayList<Cell>(hint);
+      }
+      return list;
+    }
+
+    @Override
+    public byte[] getRow(){
+      return this.rowKey.copyBytesIfNecessary();
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((rowKey == null) ? 0 : rowKey.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        MultiMutation other = (MultiMutation)obj;
+        return rowKey.equals(other.rowKey);
+    }
+
+  }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index dfb9ad4..4e329e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 
@@ -40,8 +41,6 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
     protected RegionCoprocessorEnvironment env;
     protected IndexCodec codec;
 
-    abstract protected boolean useRawScanToPrimeBlockCache();
-    
     @Override
     public void extendBaseIndexBuilderInstead() {}
 
@@ -65,9 +64,14 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
     }
 
     @Override
-    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException {
         // noop
     }
+    
+    @Override
+    public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        return IndexMetaData.NULL_INDEX_META_DATA;
+    }
 
     @Override
     public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
@@ -98,22 +102,11 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
     }
 
     @Override
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData context)
             throws IOException {
         throw new UnsupportedOperationException();
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each mutation always applies
-     * to different rows, even if they are in the same batch, or are independent updates.
-     */
-    @Override
-    public byte[] getBatchId(Mutation m) {
-        return this.codec.getBatchId(m);
-    }
-
     @Override
     public void stop(String why) {
         LOG.debug("Stopping because: " + why);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
new file mode 100644
index 0000000..1ce4e2e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.builder;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.covered.IndexCodec;
+
+/**
+ *
+ */
+public abstract class BaseIndexCodec implements IndexCodec {
+
+  @Override
+  public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+    // noop
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * By default, the codec is always enabled. Subclasses should override this method if they want do
+   * decide to index on a per-mutation basis.
+ * @throws IOException 
+   */
+  @Override
+  public boolean isEnabled(Mutation m) throws IOException {
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index d5fd34d..ae2125e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
 import org.apache.phoenix.hbase.index.parallel.Task;
 import org.apache.phoenix.hbase.index.parallel.TaskBatch;
@@ -115,7 +116,8 @@ public class IndexBuildManager implements Stoppable {
       MiniBatchOperationInProgress<Mutation> miniBatchOp,
       Collection<? extends Mutation> mutations) throws Throwable {
     // notify the delegate that we have started processing a batch
-    this.delegate.batchStarted(miniBatchOp);
+    final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
+    this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
     // parallelize each mutation into its own task
     // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
@@ -129,7 +131,7 @@ public class IndexBuildManager implements Stoppable {
 
         @Override
         public Collection<Pair<Mutation, byte[]>> call() throws IOException {
-          return delegate.getIndexUpdate(m);
+          return delegate.getIndexUpdate(m, indexMetaData);
         }
 
       });
@@ -156,28 +158,24 @@ public class IndexBuildManager implements Stoppable {
   }
 
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
-      Collection<KeyValue> filtered) throws IOException {
+      Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException {
     // this is run async, so we can take our time here
-    return delegate.getIndexUpdateForFilteredRows(filtered);
+    return delegate.getIndexUpdateForFilteredRows(filtered, indexMetaData);
   }
 
   public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
     delegate.batchCompleted(miniBatchOp);
   }
 
-  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp)
+  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData indexMetaData)
       throws IOException {
-    delegate.batchStarted(miniBatchOp);
+    delegate.batchStarted(miniBatchOp, indexMetaData);
   }
 
   public boolean isEnabled(Mutation m) throws IOException {
     return delegate.isEnabled(m);
   }
 
-  public byte[] getBatchId(Mutation m) {
-    return delegate.getBatchId(m);
-  }
-
   @Override
   public void stop(String why) {
     if (stopped) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 194fdcc..36aba77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 
 /**
  * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
@@ -64,6 +65,7 @@ public interface IndexBuilder extends Stoppable {
    * Implementers must ensure that this method is thread-safe - it could (and probably will) be
    * called concurrently for different mutations, which may or may not be part of the same batch.
    * @param mutation update to the primary table to be indexed.
+ * @param context TODO
    * @return a Map of the mutations to make -> target index table name
    * @throws IOException on failure
    */
@@ -76,7 +78,7 @@ public interface IndexBuilder extends Stoppable {
   Noop Failure mode
   */
   
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
 
     /**
      * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
@@ -94,12 +96,13 @@ public interface IndexBuilder extends Stoppable {
      *  
      * @param filtered {@link KeyValue}s that previously existed, but won't be included
      * in further output from HBase.
+     * @param context TODO
      * 
      * @return a {@link Map} of the mutations to make -> target index table name
      * @throws IOException on failure
      */
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
-      Collection<KeyValue> filtered)
+      Collection<KeyValue> filtered, IndexMetaData context)
       throws IOException;
 
   /**
@@ -115,10 +118,13 @@ public interface IndexBuilder extends Stoppable {
    * <i>after</i> the {@link #getIndexUpdate} methods. Therefore, you will likely need an attribute
    * on your {@link Put}/{@link Delete} to indicate it is a batch operation.
    * @param miniBatchOp the full batch operation to be written
+ * @param context TODO
  * @throws IOException 
    */
-  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
+  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException;
 
+  public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
+  
   /**
    * This allows the codec to dynamically change whether or not indexing should take place for a
    * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
@@ -133,11 +139,4 @@ public interface IndexBuilder extends Stoppable {
  * @throws IOException 
    */
   public boolean isEnabled(Mutation m) throws IOException;
-
-  /**
-   * @param m mutation that has been received by the indexer and is waiting to be indexed
-   * @return the ID of batch to which the Mutation belongs, or <tt>null</tt> if the mutation is not
-   *         part of a batch.
-   */
-  public byte[] getBatchId(Mutation m);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index e3ef831..93de11e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -14,7 +14,7 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.index.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
 
 /**
  * Codec for creating index updates from the current state of a table.
@@ -49,10 +49,11 @@ public interface IndexCodec {
      * @param state
      *            the current state of the table that needs to be cleaned up. Generally, you only care about the latest
      *            column values, for each column you are indexing for each index table.
+     * @param context TODO
      * @return the pairs of (deletes, index table name) that should be applied.
      * @throws IOException
      */
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException;
 
     // table state has the pending update already applied, before calling
     // get the new index entries
@@ -68,10 +69,11 @@ public interface IndexCodec {
      * @param state
      *            the current state of the table that needs to an index update Generally, you only care about the latest
      *            column values, for each column you are indexing for each index table.
+     * @param context TODO
      * @return the pairs of (updates,index table name) that should be applied.
      * @throws IOException
      */
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException;
 
     /**
      * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't
@@ -88,19 +90,4 @@ public interface IndexCodec {
      * @throws IOException
      */
     public boolean isEnabled(Mutation m) throws IOException;
-
-    /**
-     * Get the batch identifier of the given mutation. Generally, updates to the table will take place in a batch of
-     * updates; if we know that the mutation is part of a batch, we can build the state much more intelligently.
-     * <p>
-     * <b>If you have batches that have multiple updates to the same row state, you must specify a batch id for each
-     * batch. Otherwise, we cannot guarantee index correctness</b>
-     * 
-     * @param m
-     *            mutation that may or may not be part of the batch
-     * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
-     */
-    public byte[] getBatchId(Mutation m);
-
-    public void setContext(TableState state, Mutation mutation) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
new file mode 100644
index 0000000..ee25a40
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+public interface IndexMetaData {
+    public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {};
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index f47a71a..2da5771 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -36,8 +36,6 @@ import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
-import com.google.common.collect.Maps;
-
 /**
  * Manage the state of the HRegion's view of the table, for the single row.
  * <p>
@@ -58,7 +56,6 @@ public class LocalTableState implements TableState {
     private List<Cell> kvs = new ArrayList<Cell>();
     private List<? extends IndexedColumnGroup> hints;
     private CoveredColumns columnSet;
-    private final Map<String,Object> context = Maps.newHashMap();
 
     public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
         this.env = environment;
@@ -132,7 +129,7 @@ public class LocalTableState implements TableState {
      * state for any of the columns you are indexing.
      * <p>
      * <i>NOTE:</i> This method should <b>not</b> be used during
-     * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
+     * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the pending update will not yet have been
      * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
      * need to track the indexed columns.
      * <p>
@@ -275,9 +272,4 @@ public class LocalTableState implements TableState {
         ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
         return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
     }
-
-    @Override
-    public Map<String, Object> getContext() {
-        return context;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 4ba3671..11e7d1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -62,14 +62,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
     }
 
     @Override
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException {
     	// create a state manager, so we can manage each batch
         LocalTableState state = new LocalTableState(env, localTable, mutation);
-    	codec.setContext(state, mutation);
         // build the index updates for each group
         IndexUpdateManager manager = new IndexUpdateManager();
 
-        batchMutationAndAddUpdates(manager, state, mutation);
+        batchMutationAndAddUpdates(manager, state, mutation, indexMetaData);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Found index updates for Mutation: " + mutation + "\n" + manager);
@@ -84,16 +83,17 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      * don't all have the timestamp, so we need to manage everything in batches based on timestamp.
      * <p>
      * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
-     * 
-     * @param updateMap
-     *            index updates into which to add new updates. Modified as a side-effect.
      * @param state
      *            current state of the row for the mutation.
      * @param m
      *            mutation to batch
+     * @param indexMetaData TODO
+     * @param updateMap
+     *            index updates into which to add new updates. Modified as a side-effect.
+     * 
      * @throws IOException
      */
-    private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m) throws IOException {
+    private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws IOException {
         // split the mutation into timestamp-based batches
         Collection<Batch> batches = createTimestampBatchesFromMutation(m);
 
@@ -106,7 +106,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
              * group will see that as the current state, which will can cause the a delete and a put to be created for
              * the next group.
              */
-            if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
+            if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) {
                 cleanupCurrentState = false;
             }
         }
@@ -197,12 +197,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      *            <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a
      *            'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier
      *            batch already did the cleanup.
+     * @param indexMetaData TODO
      * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt>
      *         otherwise
      * @throws IOException
      */
     private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state,
-            boolean requireCurrentStateCleanup) throws IOException {
+            boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) throws IOException {
 
         // need a temporary manager for the current batch. It should resolve any conflicts for the
         // current batch. Essentially, we can get the case where a batch doesn't change the current
@@ -214,18 +215,18 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
         // determine if we need to make any cleanup given the pending update.
         long batchTs = batch.getTimestamp();
         state.setPendingUpdates(batch.getKvs());
-        addCleanupForCurrentBatch(updateMap, batchTs, state);
+        addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
 
         // A.2 do a single pass first for the updates to the current state
         state.applyPendingUpdates();
-        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
+        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData);
         // if all the updates are the latest thing in the index, we are done - don't go and fix history
         if (ColumnTracker.isNewestTime(minTs)) { return false; }
 
         // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
         // index. after this, we have the correct view of the index, from the batch up to the index
         while (!ColumnTracker.isNewestTime(minTs)) {
-            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
+            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData);
         }
 
         // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
@@ -240,7 +241,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
             // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
             // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
             // because the update may have a different set of columns or value based on the update).
-            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
+            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData);
 
             // have to roll the state forward again, so the current state is correct
             state.applyPendingUpdates();
@@ -249,18 +250,18 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
         return false;
     }
 
-    private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap)
+    private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData)
             throws IOException {
         state.setCurrentTimestamp(ts);
-        ts = addCurrentStateMutationsForBatch(updateMap, state);
+        ts = addCurrentStateMutationsForBatch(updateMap, state, indexMetaData);
         return ts;
     }
 
-    private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state)
+    private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
             throws IOException {
         // get the cleanup for the current state
         state.setCurrentTimestamp(batchTs);
-        addDeleteUpdatesToMap(updateMap, state, batchTs);
+        addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
         // ignore any index tracking from the delete
         state.resetTrackedColumns();
     }
@@ -271,19 +272,20 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      * 
      * @param updateMap
      *            to update with index mutations
-     * @param batch
-     *            to apply to the current state
      * @param state
      *            current state of the table
+     * @param indexMetaData TODO
+     * @param batch
+     *            to apply to the current state
      * @return the minimum timestamp across all index columns requested. If {@link ColumnTracker#isNewestTime(long)}
      *         returns <tt>true</tt> on the returned timestamp, we know that this <i>was not a back-in-time update</i>.
      * @throws IOException
      */
-    private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state)
+    private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state, IndexMetaData indexMetaData)
             throws IOException {
 
         // get the index updates for this current batch
-        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
+        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData);
         state.resetTrackedColumns();
 
         /*
@@ -346,13 +348,14 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      * @param state
      *            current state of the primary table. Should already by setup to the correct state from which we want to
      *            cleanup.
+     * @param indexMetaData TODO
      * @throws IOException
      */
-    private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state)
+    private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
             throws IOException {
         // get the cleanup for the current state
         state.setCurrentTimestamp(batchTs);
-        addDeleteUpdatesToMap(updateMap, state, batchTs);
+        addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
         Set<ColumnTracker> trackers = state.getTrackedColumns();
         long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
         for (ColumnTracker tracker : trackers) {
@@ -363,21 +366,22 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
         state.resetTrackedColumns();
         if (!ColumnTracker.isNewestTime(minTs)) {
             state.setHints(Lists.newArrayList(trackers));
-            cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
+            cleanupIndexStateFromBatchOnward(updateMap, minTs, state, indexMetaData);
         }
     }
 
     /**
-     * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then add them to the
+     * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the
      * update map.
      * <p>
      * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc).
+     * @param indexMetaData TODO
      * 
      * @throws IOException
      */
-    protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts)
+    protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData)
             throws IOException {
-        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
+        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData);
         if (cleanup != null) {
             for (IndexUpdate d : cleanup) {
                 if (!d.isValid()) {
@@ -392,14 +396,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
     }
 
     @Override
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData indexMetaData)
             throws IOException {
         // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
         return null;
     }
-
-    @Override
-    protected boolean useRawScanToPrimeBlockCache() {
-        return false;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index b8b2f19..d8b215c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -80,6 +80,4 @@ public interface TableState {
    * @return the keyvalues in the pending update to the table.
    */
   Collection<Cell> getPendingUpdate();
-  
-  Map<String,Object> getContext();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
deleted file mode 100644
index d90edc1..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.hbase98.TransactionAwareHTable;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class TxIndexBuilder extends BaseIndexBuilder {
-    private static final Log LOG = LogFactory.getLog(TxIndexBuilder.class);
-    public static final String TRANSACTION = "TRANSACTION";
-    private TransactionAwareHTable txTable;
-    
-    @Override
-    public void setup(RegionCoprocessorEnvironment env) throws IOException {
-        super.setup(env);
-        HTableInterface htable = env.getTable(env.getRegion().getRegionInfo().getTable());
-        this.txTable = new TransactionAwareHTable(htable); // TODO: close?
-    }
-
-    @Override
-    public void stop(String why) {
-        try {
-            if (this.txTable != null) txTable.close();
-        } catch (IOException e) {
-            LOG.warn("Unable to close txTable", e);
-        } finally {
-            super.stop(why);
-        }
-    }
-
-    @Override
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
-        // get the index updates for this current batch
-        TxTableState state = new TxTableState(mutation);
-        codec.setContext(state, mutation);
-        Transaction tx = (Transaction)state.getContext().get(TRANSACTION);
-        state.setCurrentTransaction(tx);
-        Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayListWithExpectedSize(2);
-        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state);
-        for (IndexUpdate delete : deletes) {
-            indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
-        }
-        state.addPendingUpdates(mutation);
-        // TODO: state may need to maintain the old state with the new state super imposed
-        // An alternate easier way would be to calculate the state after the data mutations
-        // have been applied.
-        Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
-        for (IndexUpdate update : updates) {
-            indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
-        }
-        return indexUpdates;
-    }
-
-    private class TxTableState implements TableState {
-        private Put put;
-        private Map<String, byte[]> attributes;
-        private List<Cell> pendingUpdates = Lists.newArrayList();
-        private Transaction transaction;
-        private final Map<String,Object> context = Maps.newHashMap();
-        
-        public TxTableState(Mutation m) {
-            this.put = new Put(m.getRow());
-            this.attributes = m.getAttributesMap();
-        }
-        
-        @Override
-        public RegionCoprocessorEnvironment getEnvironment() {
-            return env;
-        }
-
-        @Override
-        public long getCurrentTimestamp() {
-            return transaction.getReadPointer();
-        }
-
-        @Override
-        public Map<String, byte[]> getUpdateAttributes() {
-            return attributes;
-        }
-
-        @Override
-        public byte[] getCurrentRowKey() {
-            return put.getRow();
-        }
-
-        @Override
-        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
-            return Collections.emptyList();
-        }
-
-        public void addPendingUpdate(Cell cell) throws IOException {
-            put.add(cell);
-        }
-        
-        public void addPendingUpdates(Mutation m) throws IOException {
-            if (m instanceof Delete) {
-                put.getFamilyCellMap().clear();
-            } else {
-                CellScanner scanner = m.cellScanner();
-                while (scanner.advance()) {
-                    Cell cell = scanner.current();
-                    if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
-                        byte[] family = CellUtil.cloneFamily(cell);
-                        byte[] qualifier = CellUtil.cloneQualifier(cell);
-                        put.add(family, qualifier, HConstants.EMPTY_BYTE_ARRAY);
-                    } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
-                        byte[] family = CellUtil.cloneFamily(cell);
-                        put.getFamilyCellMap().remove(family);
-                    } else {
-                        put.add(cell);
-                    }
-                }
-            }
-        }
-        
-        @Override
-        public Collection<Cell> getPendingUpdate() {
-            return pendingUpdates;
-        }
-
-        public void setCurrentTransaction(Transaction tx) {
-            this.transaction = tx;
-        }
-
-        @Override
-        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
-                throws IOException {
-            ColumnTracker tracker = new ColumnTracker(indexedColumns);
-            IndexUpdate indexUpdate = new IndexUpdate(tracker);
-            final byte[] rowKey = getCurrentRowKey();
-            if (!pendingUpdates.isEmpty()) {
-                final Map<ColumnReference, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
-                        .size());
-                for (Cell kv : pendingUpdates) {
-                    // create new pointers to each part of the kv
-                    ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-                    valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
-                }
-                ValueGetter getter = new ValueGetter() {
-                    @Override
-                    public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
-                        // TODO: from IndexMaintainer we return null if ref is empty key value. Needed?
-                        return valueMap.get(ref);
-                    }
-                    @Override
-                    public byte[] getRowKey() {
-                        return rowKey;
-                    }
-                };
-                return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
-            }
-            // Establish initial state of table by finding the old values
-            // We'll apply the Mutation to this next
-            Get get = new Get(rowKey);
-            get.setMaxVersions();
-            for (ColumnReference ref : indexedColumns) {
-                get.addColumn(ref.getFamily(), ref.getQualifier());
-            }
-            txTable.startTx(transaction);
-            final Result result = txTable.get(get);
-            ValueGetter getter = new ValueGetter() {
-
-                @Override
-                public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
-                    Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
-                    if (cell == null) {
-                        return null;
-                    }
-                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                    ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                    return ptr;
-                }
-
-                @Override
-                public byte[] getRowKey() {
-                    return rowKey;
-                }
-                
-            };
-            for (ColumnReference ref : indexedColumns) {
-                Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
-                if (cell != null) {
-                    addPendingUpdate(cell);
-                }
-            }
-            return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
-        }
-
-        @Override
-        public Map<String, Object> getContext() {
-            return context;
-        }
-        
-    }
-
-    @Override
-    protected boolean useRawScanToPrimeBlockCache() {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index d4bd460..59bc8de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
-import org.apache.phoenix.index.BaseIndexCodec;
 
 import com.google.common.collect.Lists;
 
@@ -59,7 +60,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) {
         List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
         for (ColumnGroup group : groups) {
             IndexUpdate update = getIndexUpdateForGroup(group, state);
@@ -113,7 +114,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
         List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
         for (ColumnGroup group : groups) {
             deletes.add(getDeleteForGroup(group, state));
@@ -360,7 +361,4 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
         // simple check for the moment.
         return groups.size() > 0;
     }
-
-    @Override
-    public void setContext(TableState state, Mutation mutation) throws IOException {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index de8f752..60698c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.covered.Batch;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
@@ -118,8 +119,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder {
 
   @Override
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
-      Collection<KeyValue> filtered) throws IOException {
-
+      Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException {
     // stores all the return values
     IndexUpdateManager updateMap = new IndexUpdateManager();
     // batch the updates by row to make life easier and ordered
@@ -146,7 +146,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder {
       for (Batch entry : timeBatch) {
         //just set the timestamp on the table - it already has all the future state
         state.setCurrentTimestamp(entry.getTimestamp());
-        this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp());
+        this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp(), indexMetaData);
       }
     }
     return updateMap.toMap();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
deleted file mode 100644
index 1c45cd3..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import org.apache.phoenix.hbase.index.covered.IndexCodec;
-
-/**
- *
- */
-public abstract class BaseIndexCodec implements IndexCodec {
-
-  @Override
-  public void initialize(RegionCoprocessorEnvironment env) throws IOException {
-    // noop
-  }
-
-  /**
-   * {@inheritDoc}
-   * <p>
-   * By default, the codec is always enabled. Subclasses should override this method if they want do
-   * decide to index on a per-mutation basis.
- * @throws IOException 
-   */
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   * <p>
-   * Assumes each mutation is not in a batch. Subclasses that have different batching behavior
-   * should override this.
-   */
-  @Override
-  public byte[] getBatchId(Mutation m) {
-    return null;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 1e28766..09a9f90 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -18,14 +18,32 @@
 package org.apache.phoenix.index;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
-import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
 
 /**
  * Index builder for covered-columns index that ties into phoenix for faster use.
@@ -33,6 +51,15 @@ import org.apache.phoenix.util.IndexUtil;
 public class PhoenixIndexBuilder extends NonTxIndexBuilder {
 
     @Override
+    public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap());
+    }
+
+    protected PhoenixIndexCodec getCodec() {
+        return (PhoenixIndexCodec)codec;
+    }
+
+    @Override
     public void setup(RegionCoprocessorEnvironment env) throws IOException {
         super.setup(env);
         Configuration conf = env.getConfiguration();
@@ -43,16 +70,54 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
     }
 
     @Override
-    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException {
         // The entire purpose of this method impl is to get the existing rows for the
         // table rows being indexed into the block cache, as the index maintenance code
         // does a point scan per row.
-        // TODO: provide a means for the transactional case to just return the Scanner
-        // for when this is executed as it seems like that would be more efficient.
-        IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
-    }
-
-    private PhoenixIndexCodec getCodec() {
-        return (PhoenixIndexCodec)this.codec;
+        List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+        List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
+        Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
+                new HashMap<ImmutableBytesWritable, IndexMaintainer>();
+        ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
+            
+            for(IndexMaintainer indexMaintainer: indexMaintainers) {
+                if (indexMaintainer.isImmutableRows()) continue;
+                indexTableName.set(indexMaintainer.getIndexTableName());
+                if (maintainers.get(indexTableName) != null) continue;
+                maintainers.put(indexTableName, indexMaintainer);
+            }
+            
+        }
+        if (maintainers.isEmpty()) return;
+        Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
+        scan.setRaw(true);
+        ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+        scanRanges.initializeScan(scan);
+        scan.setFilter(scanRanges.getSkipScanFilter());
+        HRegion region = env.getRegion();
+        RegionScanner scanner = region.getScanner(scan);
+        // Run through the scanner using internal nextRaw method
+        region.startRegionOperation();
+        try {
+            synchronized (scanner) {
+                boolean hasMore;
+                do {
+                    List<Cell> results = Lists.newArrayList();
+                    // Results are potentially returned even when the return value of s.next is
+                    // false since this is an indication of whether or not there are more values
+                    // after the ones returned
+                    hasMore = scanner.nextRaw(results);
+                } while (hasMore);
+            }
+        } finally {
+            try {
+                scanner.close();
+            } finally {
+                region.closeRegionOperation();
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 571c559..1fe9931 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -10,53 +10,41 @@
 package org.apache.phoenix.index;
 
 import java.io.IOException;
-import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import co.cask.tephra.Transaction;
-
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.IndexMetaDataCache;
-import org.apache.phoenix.cache.ServerCacheClient;
-import org.apache.phoenix.cache.TenantCache;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.TableState;
-import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
 
 /**
  * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index (
- * {@link #getIndexDeletes(TableState)}) as well as what the new index state should be (
- * {@link #getIndexUpserts(TableState)}).
+ * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the new index state should be (
+ * {@link #getIndexUpserts(TableState, IndexMetaData)}).
  */
 public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MD = "IdxMD";
     public static final String INDEX_UUID = "IdxUUID";
     public static final String INDEX_MAINTAINERS = "IndexMaintainers";
+    private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;
 
     private RegionCoprocessorEnvironment env;
-    private KeyValueBuilder kvBuilder = GenericKeyValueBuilder.INSTANCE;;
 
     @Override
     public void initialize(RegionCoprocessorEnvironment env) throws IOException {
@@ -71,67 +59,78 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         return true;
     }
 
-    public IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException {
-        if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
-        byte[] uuid = attributes.get(INDEX_UUID);
-        if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
-        byte[] md = attributes.get(INDEX_MD);
-        byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
-        if (md != null) {
-            final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
-            final Transaction txn = TransactionUtil.decodeTxnState(txState);
-            return new IndexMetaDataCache() {
-
-                @Override
-                public void close() throws IOException {}
-
-                @Override
-                public List<IndexMaintainer> getIndexMaintainers() {
-                    return indexMaintainers;
-                }
-
-                @Override
-                public Transaction getTransaction() {
-                    return txn;
-                }
-
-            };
-        } else {
-            byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
-            ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
-            TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
-            IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
-            if (indexCache == null) {
-                String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
-                SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
-                        .setMessage(msg).build().buildException();
-                ServerUtil.throwIOException("Index update failed", e); // will not return
+    @Override
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException {
+        List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+        if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
+            return Collections.emptyList();
+        }
+        // TODO: confirm that this special case isn't needed
+        // (as state should match this with the above call, since there are no mutable columns)
+        /*
+        if (maintainer.isImmutableRows()) {
+            indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
+            indexUpdate.setTable(maintainer.getIndexTableName());
+            valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
+        }
+        */
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ptr.set(state.getCurrentRowKey());
+        List<IndexUpdate> indexUpdates = Lists.newArrayList();
+        for (IndexMaintainer maintainer : indexMaintainers) {
+            if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes
+                assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0);
             }
-            return indexCache;
+            Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+            ValueGetter valueGetter = statePair.getFirst();
+            IndexUpdate indexUpdate = statePair.getSecond();
+            indexUpdate.setTable(maintainer.getIndexTableName());
+            Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
+                    .getRegion().getStartKey(), env.getRegion().getEndKey());
+            indexUpdate.setUpdate(put);
+            indexUpdates.add(indexUpdate);
         }
+        return indexUpdates;
+    }
 
+    @Override
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException {
+        List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ptr.set(state.getCurrentRowKey());
+        List<IndexUpdate> indexUpdates = Lists.newArrayList();
+        for (IndexMaintainer maintainer : indexMaintainers) {
+            if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes
+                assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0);
+            }
+            if (maintainer.isImmutableRows()) {
+                continue;
+            }
+            Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+            ValueGetter valueGetter = statePair.getFirst();
+            IndexUpdate indexUpdate = statePair.getSecond();
+            indexUpdate.setTable(maintainer.getIndexTableName());
+            Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
+                    state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
+            indexUpdate.setUpdate(delete);
+            indexUpdates.add(indexUpdate);
+        }
+        return indexUpdates;
     }
 
+    /*
     @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
-        return getIndexUpdates(state, true);
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, BatchContext context) throws IOException {
+        return getIndexUpdates(state, context, true);
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
-        return getIndexUpdates(state, false);
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, BatchContext context) throws IOException {
+        return getIndexUpdates(state, context, false);
     }
 
-    /**
-     * @param state
-     * @param upsert
-     *            prepare index upserts if it's true otherwise prepare index deletes.
-     * @return
-     * @throws IOException
-     */
-    private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException {
-        @SuppressWarnings("unchecked")
-        List<IndexMaintainer> indexMaintainers = (List<IndexMaintainer>)state.getContext().get(INDEX_MAINTAINERS);
+    private Iterable<IndexUpdate> getIndexUpdates(TableState state, BatchContext context, boolean upsert) throws IOException {
+        List<IndexMaintainer> indexMaintainers = ((PhoenixBatchContext)context).getIndexMetaData().getIndexMaintainers();
         if (indexMaintainers.isEmpty()) { return Collections.emptyList(); }
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -165,10 +164,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             }
             Mutation mutation = null;
             if (upsert) {
-                mutation = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), env
+                mutation = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
                         .getRegion().getStartKey(), env.getRegion().getEndKey());
             } else {
-                mutation = maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(),
+                mutation = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
                         state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
             }
             indexUpdate.setUpdate(mutation);
@@ -180,25 +179,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         }
         return indexUpdates;
     }
+    */
 
     @Override
     public boolean isEnabled(Mutation m) throws IOException {
         return hasIndexMaintainers(m.getAttributesMap());
     }
-
-    @Override
-    public byte[] getBatchId(Mutation m) {
-        Map<String, byte[]> attributes = m.getAttributesMap();
-        return attributes.get(INDEX_UUID);
-    }
-
-    @Override
-    public void setContext(TableState state, Mutation mutation) throws IOException {
-        IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes());
-        List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers();
-        Map<String,Object> context = state.getContext();
-        context.clear();
-        context.put(INDEX_MAINTAINERS, indexMaintainers);
-        context.put(TxIndexBuilder.TRANSACTION, indexCache.getTransaction());
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
new file mode 100644
index 0000000..26c1c12
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import co.cask.tephra.Transaction;
+
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.IndexMetaDataCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
+
+public class PhoenixIndexMetaData implements IndexMetaData {
+    private final IndexMetaDataCache indexMetaDataCache;
+    
+    private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
+        if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
+        byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID);
+        if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
+        byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+        byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
+        if (md != null) {
+            final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
+            final Transaction txn = TransactionUtil.decodeTxnState(txState);
+            return new IndexMetaDataCache() {
+
+                @Override
+                public void close() throws IOException {}
+
+                @Override
+                public List<IndexMaintainer> getIndexMaintainers() {
+                    return indexMaintainers;
+                }
+
+                @Override
+                public Transaction getTransaction() {
+                    return txn;
+                }
+
+            };
+        } else {
+            byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
+            ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
+            TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
+            IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
+            if (indexCache == null) {
+                String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
+                SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
+                        .setMessage(msg).build().buildException();
+                ServerUtil.throwIOException("Index update failed", e); // will not return
+            }
+            return indexCache;
+        }
+
+    }
+
+    public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
+        indexMetaDataCache = getIndexMetaData(env, attributes);
+    }
+    
+    public Transaction getTransaction() {
+        return indexMetaDataCache.getTransaction();
+    }
+    
+    public List<IndexMaintainer> getIndexMaintainers() {
+        return indexMetaDataCache.getIndexMaintainers();
+    }
+}