You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by je...@apache.org on 2014/02/08 08:43:00 UTC

[11/11] git commit: Port Phoenix to Hbase0.98

Port Phoenix to Hbase0.98


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/53f7d3ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/53f7d3ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/53f7d3ce

Branch: refs/heads/4.0.0
Commit: 53f7d3ce880f79b18a5a728be13f965e84c52e56
Parents: 214ad9e
Author: Jeffrey Zhong <jz...@JZhongs-MacBook-Pro.local>
Authored: Fri Feb 7 23:41:57 2014 -0800
Committer: Jeffrey Zhong <jz...@JZhongs-MacBook-Pro.local>
Committed: Fri Feb 7 23:43:34 2014 -0800

----------------------------------------------------------------------
 phoenix-core/pom.xml                            |  121 +-
 .../hbase/index/IndexLogRollSynchronizer.java   |   14 +-
 .../org/apache/hadoop/hbase/index/Indexer.java  |   95 +-
 .../hbase/index/builder/BaseIndexBuilder.java   |    4 +-
 .../hbase/index/builder/IndexBuildManager.java  |    6 +-
 .../hbase/index/builder/IndexBuilder.java       |    4 +-
 .../covered/CoveredColumnsIndexBuilder.java     |   21 +-
 .../hbase/index/covered/LocalTableState.java    |   15 +-
 .../hbase/index/covered/data/IndexMemStore.java |   18 +-
 .../hbase/index/covered/data/LocalTable.java    |    5 +-
 .../example/CoveredColumnIndexCodec.java        |    1 -
 .../covered/example/CoveredColumnIndexer.java   |   11 +-
 .../filter/ApplyAndFilterDeletesFilter.java     |   36 +-
 ...olumnTrackingNextLargestTimestampFilter.java |   16 +-
 .../index/covered/filter/FamilyOnlyFilter.java  |    8 +-
 .../covered/filter/MaxTimestampFilter.java      |   22 +-
 .../covered/filter/NewerTimestampFilter.java    |   12 +-
 .../covered/update/IndexUpdateManager.java      |   11 +-
 .../hbase/index/parallel/ThreadPoolManager.java |    1 -
 .../index/scanner/FilteredKeyValueScanner.java  |    2 -
 .../index/table/CoprocessorHTableFactory.java   |    3 +-
 .../hbase/index/util/IndexManagementUtil.java   |    7 +-
 .../hadoop/hbase/index/wal/IndexedKeyValue.java |   73 +-
 .../hadoop/hbase/index/wal/KeyValueCodec.java   |   11 +-
 .../regionserver/wal/IndexedHLogReader.java     |  119 +-
 .../hbase/regionserver/wal/IndexedWALEdit.java  |   91 -
 .../regionserver/wal/IndexedWALEditCodec.java   |   40 +-
 .../apache/phoenix/cache/ServerCacheClient.java |   73 +-
 .../cache/aggcache/SpillableGroupByCache.java   |    8 +-
 .../apache/phoenix/client/ClientKeyValue.java   |  133 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |    5 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |    5 +-
 .../phoenix/coprocessor/BaseRegionScanner.java  |   17 +-
 .../GroupedAggregateRegionObserver.java         |   26 +-
 .../coprocessor/HashJoinRegionScanner.java      |   47 +-
 .../coprocessor/MetaDataEndpointImpl.java       | 1352 ++--
 .../phoenix/coprocessor/MetaDataProtocol.java   |  167 +-
 .../phoenix/coprocessor/ScanRegionObserver.java |   47 +-
 .../coprocessor/SequenceRegionObserver.java     |   86 +-
 .../coprocessor/ServerCachingEndpointImpl.java  |   98 +-
 .../coprocessor/ServerCachingProtocol.java      |    3 +-
 .../UngroupedAggregateRegionObserver.java       |   95 +-
 .../coprocessor/generated/MetaDataProtos.java   | 7135 ++++++++++++++++++
 .../coprocessor/generated/PTableProtos.java     | 5315 +++++++++++++
 .../generated/ServerCacheFactoryProtos.java     |  568 ++
 .../generated/ServerCachingProtos.java          | 3447 +++++++++
 .../apache/phoenix/execute/MutationState.java   |   10 +-
 .../expression/KeyValueColumnExpression.java    |    1 -
 .../DistinctValueWithCountServerAggregator.java |    4 +-
 .../phoenix/filter/BooleanExpressionFilter.java |   11 +-
 .../MultiCFCQKeyValueComparisonFilter.java      |   11 +
 .../filter/MultiCQKeyValueComparisonFilter.java |   12 +
 .../filter/MultiKeyValueComparisonFilter.java   |    8 +-
 .../phoenix/filter/RowKeyComparisonFilter.java  |   15 +-
 .../SingleCFCQKeyValueComparisonFilter.java     |   13 +-
 .../SingleCQKeyValueComparisonFilter.java       |   11 +
 .../filter/SingleKeyValueComparisonFilter.java  |   11 +-
 .../apache/phoenix/filter/SkipScanFilter.java   |   25 +-
 .../apache/phoenix/index/IndexMaintainer.java   |   41 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |   10 +-
 .../index/PhoenixIndexFailurePolicy.java        |   44 +-
 .../iterate/MappedByteBufferSortedQueue.java    |    5 +-
 .../iterate/RegionScannerResultIterator.java    |    6 +-
 .../phoenix/iterate/SpoolingResultIterator.java |    4 +-
 .../java/org/apache/phoenix/job/JobManager.java |   10 +-
 .../apache/phoenix/join/HashCacheFactory.java   |    2 +-
 .../org/apache/phoenix/join/ScanProjector.java  |    5 +-
 .../phoenix/map/reduce/CSVBulkLoader.java       |    3 -
 .../apache/phoenix/optimize/QueryOptimizer.java |    2 +-
 .../apache/phoenix/protobuf/ProtobufUtil.java   |  133 +
 .../query/ConnectionQueryServicesImpl.java      |  260 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   12 +-
 .../phoenix/query/HConnectionFactory.java       |    7 +-
 .../phoenix/query/QueryServicesOptions.java     |    4 +-
 .../apache/phoenix/schema/DelegateColumn.java   |   10 -
 .../apache/phoenix/schema/MetaDataClient.java   |    4 +-
 .../java/org/apache/phoenix/schema/PColumn.java |    4 +-
 .../org/apache/phoenix/schema/PColumnImpl.java  |   98 +-
 .../java/org/apache/phoenix/schema/PTable.java  |    2 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |  286 +-
 .../org/apache/phoenix/schema/Sequence.java     |   73 +-
 .../apache/phoenix/schema/stat/PTableStats.java |    3 +-
 .../phoenix/schema/stat/PTableStatsImpl.java    |   20 +-
 .../schema/tuple/MultiKeyValueTuple.java        |   19 +-
 .../org/apache/phoenix/schema/tuple/Tuple.java  |    6 +-
 .../java/org/apache/phoenix/util/CSVLoader.java |    1 -
 .../java/org/apache/phoenix/util/IndexUtil.java |   22 +-
 .../org/apache/phoenix/util/KeyValueUtil.java   |   39 +-
 .../org/apache/phoenix/util/MetaDataUtil.java   |   23 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |    7 +-
 .../org/apache/phoenix/util/ResultUtil.java     |   29 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |    2 -
 .../org/apache/phoenix/util/ServerUtil.java     |   16 +-
 .../java/org/apache/phoenix/util/TupleUtil.java |   13 +-
 .../hadoop/hbase/index/IndexTestingUtils.java   |    4 +-
 .../TestFailForUnsupportedHBaseVersions.java    |    2 +
 .../TestEndToEndCoveredColumnsIndexBuilder.java |   26 +-
 .../index/covered/TestLocalTableState.java      |   13 +-
 .../index/covered/data/TestIndexMemStore.java   |    4 +-
 .../example/TestCoveredColumnIndexCodec.java    |   16 +-
 .../example/TestEndToEndCoveredIndexing.java    |    2 +
 .../TestEndtoEndIndexingWithCompression.java    |    7 +-
 .../covered/example/TestFailWithoutRetries.java |    2 +
 .../filter/TestApplyAndFilterDeletesFilter.java |    8 +-
 .../index/util/TestIndexManagementUtil.java     |    6 +-
 .../index/write/TestWALRecoveryCaching.java     |    7 +-
 .../recovery/TestPerRegionIndexWriteCache.java  |   87 +-
 .../wal/TestReadWriteKeyValuesWithCodec.java    |   48 +-
 ...ALReplayWithIndexWritesAndCompressedWAL.java |   28 +-
 ...exWritesAndUncompressedWALInHBase_094_9.java |    2 +
 .../phoenix/client/TestClientKeyValueLocal.java |    5 +-
 .../apache/phoenix/end2end/AlterTableTest.java  |    4 -
 .../phoenix/end2end/NativeHBaseTypesTest.java   |    2 +-
 .../phoenix/end2end/index/IndexTestUtil.java    |   14 +-
 .../phoenix/filter/SkipScanFilterTest.java      |   11 +-
 .../iterate/AggregateResultScannerTest.java     |    7 -
 .../java/org/apache/phoenix/query/BaseTest.java |    2 +
 .../java/org/apache/phoenix/util/TestUtil.java  |   36 +-
 phoenix-flume/pom.xml                           |   47 +-
 phoenix-protocol/README.txt                     |   10 +
 phoenix-protocol/src/main/MetaDataService.proto |  118 +
 phoenix-protocol/src/main/PTable.proto          |   72 +
 .../src/main/ServerCacheFactory.proto           |   29 +
 .../src/main/ServerCachingService.proto         |   61 +
 phoenix-protocol/src/main/build-proto.sh        |   37 +
 pom.xml                                         |  111 +-
 126 files changed, 19574 insertions(+), 2005 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 0e2b8e5..5cc7804 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -213,23 +213,11 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-    </dependency>
     <!-- Findbugs Annotation -->
     <dependency>
       <groupId>net.sourceforge.findbugs</groupId>
       <artifactId>annotations</artifactId>
     </dependency>
-
-    <!-- Test Dependencies -->
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <type>test-jar</type>
-    </dependency>
-    <!-- Needed by HBase to run the minicluster -->
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-core-asl</artifactId>
@@ -249,12 +237,37 @@
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf-java.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.jruby</groupId>
+      <artifactId>jruby-complete</artifactId>
+      <version>${jruby.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
   </dependencies>
 
   <profiles>
@@ -268,14 +281,60 @@
         </property>
       </activation>
       <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-test</artifactId>
-        </dependency>
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-testing-util</artifactId>
+            <version>${hbase-hadoop1.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${hbase-hadoop1.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-protocol</artifactId>
+            <version>${hbase-hadoop1.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${hbase-hadoop1.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>${hadoop-one.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>hsqldb</groupId>
+                <artifactId>hsqldb</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>net.sf.kosmosfs</groupId>
+                <artifactId>kfs</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.eclipse.jdt</groupId>
+                <artifactId>core</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>net.java.dev.jets3t</groupId>
+                <artifactId>jets3t</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>oro</groupId>
+                <artifactId>oro</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-test</artifactId>
+            <version>${hadoop-one.version}</version>
+            <optional>true</optional>
+            <scope>test</scope>
+          </dependency>
       </dependencies>
     </profile>
 
@@ -290,6 +349,26 @@
       </activation>
       <dependencies>
         <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-testing-util</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-protocol</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+        </dependency>
+        <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-common</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java
index ca61221..904612f 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java
@@ -71,6 +71,7 @@ public class IndexLogRollSynchronizer implements WALActionsListener {
 
   private static final Log LOG = LogFactory.getLog(IndexLogRollSynchronizer.class);
   private WriteLock logArchiveLock;
+    private boolean lockAcquired = false;
 
   public IndexLogRollSynchronizer(WriteLock logWriteLock){
     this.logArchiveLock = logWriteLock;
@@ -81,12 +82,21 @@ public class IndexLogRollSynchronizer implements WALActionsListener {
   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
     //take a write lock on the index - any pending index updates will complete before we finish
     LOG.debug("Taking INDEX_UPDATE writelock");
-    logArchiveLock.lock();
-    LOG.debug("Got the INDEX_UPDATE writelock");
+    try {
+      logArchiveLock.lockInterruptibly();
+      lockAcquired = true;
+    } catch (InterruptedException e) {
+      LOG.info("Acquiring lock got interrupted!");
+      Thread.currentThread().interrupt();
+    }
+    if (lockAcquired) {
+      LOG.debug("Got the INDEX_UPDATE writelock");
+    }
   }
   
   @Override
   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+    if (!lockAcquired) return;
     // done archiving the logs, any WAL updates will be replayed on failure
     LOG.debug("Releasing INDEX_UPDATE writelock");
     logArchiveLock.unlock();

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java
index fe2852b..aa9df58 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
@@ -167,7 +168,7 @@ public class Indexer extends BaseRegionObserver {
         this.builder = new IndexBuildManager(env);
     
         // get a reference to the WAL
-        log = env.getRegionServerServices().getWAL();
+      log = env.getRegionServerServices().getWAL(null);
         // add a synchronizer so we don't archive a WAL that we need
         log.registerWALActionsListener(new IndexLogRollSynchronizer(INDEX_READ_WRITE_LOCK.writeLock()));
     
@@ -218,9 +219,9 @@ public class Indexer extends BaseRegionObserver {
 
   @Override
   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
-      final WALEdit edit, final boolean writeToWAL) throws IOException {
+      final WALEdit edit, final Durability durability) throws IOException {
       if (this.disabled) {
-          super.prePut(c, put, edit, writeToWAL);
+      super.prePut(c, put, edit, durability);
           return;
         }
     // just have to add a batch marker to the WALEdit so we get the edit again in the batch
@@ -230,13 +231,13 @@ public class Indexer extends BaseRegionObserver {
 
   @Override
   public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
-      WALEdit edit, boolean writeToWAL) throws IOException {
+      WALEdit edit, final Durability durability) throws IOException {
       if (this.disabled) {
-          super.preDelete(e, delete, edit, writeToWAL);
+      super.preDelete(e, delete, edit, durability);
           return;
         }
     try {
-      preDeleteWithExceptions(e, delete, edit, writeToWAL);
+      preDeleteWithExceptions(e, delete, edit, durability);
       return;
     } catch (Throwable t) {
       rethrowIndexingException(t);
@@ -246,7 +247,7 @@ public class Indexer extends BaseRegionObserver {
   }
 
   public void preDeleteWithExceptions(ObserverContext<RegionCoprocessorEnvironment> e,
-      Delete delete, WALEdit edit, boolean writeToWAL) throws Exception {
+      Delete delete, WALEdit edit, final Durability durability) throws Exception {
     // if we are making the update as part of a batch, we need to add in a batch marker so the WAL
     // is retained
     if (this.builder.getBatchId(delete) != null) {
@@ -257,14 +258,14 @@ public class Indexer extends BaseRegionObserver {
     // get the mapping for index column -> target index table
     Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(delete);
 
-    if (doPre(indexUpdates, edit, writeToWAL)) {
+    if (doPre(indexUpdates, edit, durability)) {
       takeUpdateLock("delete");
     }
   }
 
   @Override
   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
       if (this.disabled) {
           super.preBatchMutate(c, miniBatchOp);
           return;
@@ -281,12 +282,13 @@ public class Indexer extends BaseRegionObserver {
 
   @SuppressWarnings("deprecation")
   public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
-      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws Throwable {
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
 
     // first group all the updates for a single row into a single update to be processed
     Map<ImmutableBytesPtr, MultiMutation> mutations =
         new HashMap<ImmutableBytesPtr, MultiMutation>();
-    boolean durable = false;
+
+    Durability durability = Durability.SKIP_WAL;
     for (int i = 0; i < miniBatchOp.size(); i++) {
       // remove the batch keyvalue marker - its added for all puts
       WALEdit edit = miniBatchOp.getWalEdit(i);
@@ -294,11 +296,13 @@ public class Indexer extends BaseRegionObserver {
       // we could check is indexing is enable for the mutation in prePut and then just skip this
       // after checking here, but this saves us the checking again.
       if (edit != null) {
-        KeyValue kv = edit.getKeyValues().remove(0);
-        assert kv == BATCH_MARKER : "Expected batch marker from the WALEdit, but got: " + kv;
+        KeyValue kv = edit.getKeyValues().get(0);
+        if (kv == BATCH_MARKER) {
+          // remove batch marker from the WALEdit
+          edit.getKeyValues().remove(0);
+        }
       }
-      Pair<Mutation, Integer> op = miniBatchOp.getOperation(i);
-      Mutation m = op.getFirst();
+      Mutation m = miniBatchOp.getOperation(i);
       // skip this mutation if we aren't enabling indexing
       // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
       // should be indexed, which means we need to expose another method on the builder. Such is the
@@ -308,8 +312,8 @@ public class Indexer extends BaseRegionObserver {
       }
       
       // figure out if this is batch is durable or not
-      if(!durable){
-        durable = m.getDurability() != Durability.SKIP_WAL;
+      if (m.getDurability().ordinal() > durability.ordinal()) {
+        durability = m.getDurability();
       }
 
       // add the mutation to the batch set
@@ -317,7 +321,7 @@ public class Indexer extends BaseRegionObserver {
       MultiMutation stored = mutations.get(row);
       // we haven't seen this row before, so add it
       if (stored == null) {
-        stored = new MultiMutation(row, m.getWriteToWAL());
+        stored = new MultiMutation(row);
         mutations.put(row, stored);
       }
       stored.addAll(m);
@@ -336,7 +340,7 @@ public class Indexer extends BaseRegionObserver {
     Collection<Pair<Mutation, byte[]>> indexUpdates =
         this.builder.getIndexUpdate(miniBatchOp, mutations.values());
     // write them
-    if (doPre(indexUpdates, edit, durable)) {
+    if (doPre(indexUpdates, edit, durability)) {
       takeUpdateLock("batch mutation");
     }
   }
@@ -372,9 +376,8 @@ public class Indexer extends BaseRegionObserver {
 
     private ImmutableBytesPtr rowKey;
 
-    public MultiMutation(ImmutableBytesPtr rowkey, boolean writeToWal) {
+    public MultiMutation(ImmutableBytesPtr rowkey) {
       this.rowKey = rowkey;
-      this.writeToWAL = writeToWal;
     }
 
     /**
@@ -383,9 +386,9 @@ public class Indexer extends BaseRegionObserver {
     @SuppressWarnings("deprecation")
     public void addAll(Mutation stored) {
       // add all the kvs
-      for (Entry<byte[], List<KeyValue>> kvs : stored.getFamilyMap().entrySet()) {
+      for (Entry<byte[], List<Cell>> kvs : stored.getFamilyCellMap().entrySet()) {
         byte[] family = kvs.getKey();
-        List<KeyValue> list = getKeyValueList(family, kvs.getValue().size());
+        List<Cell> list = getKeyValueList(family, kvs.getValue().size());
         list.addAll(kvs.getValue());
         familyMap.put(family, list);
       }
@@ -396,15 +399,12 @@ public class Indexer extends BaseRegionObserver {
           this.setAttribute(attrib.getKey(), attrib.getValue());
         }
       }
-      if (stored.getWriteToWAL()) {
-        this.writeToWAL = true;
-      }
     }
 
-    private List<KeyValue> getKeyValueList(byte[] family, int hint) {
-      List<KeyValue> list = familyMap.get(family);
+    private List<Cell> getKeyValueList(byte[] family, int hint) {
+      List<Cell> list = familyMap.get(family);
       if (list == null) {
-        list = new ArrayList<KeyValue>(hint);
+        list = new ArrayList<Cell>(hint);
       }
       return list;
     }
@@ -423,16 +423,6 @@ public class Indexer extends BaseRegionObserver {
     public boolean equals(Object o) {
       return o == null ? false : o.hashCode() == this.hashCode();
     }
-
-    @Override
-    public void readFields(DataInput arg0) throws IOException {
-      throw new UnsupportedOperationException("MultiMutations cannot be read/written");
-    }
-
-    @Override
-    public void write(DataOutput arg0) throws IOException {
-      throw new UnsupportedOperationException("MultiMutations cannot be read/written");
-    }
   }
 
   /**
@@ -441,7 +431,7 @@ public class Indexer extends BaseRegionObserver {
    * @throws IOException
    */
   private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final WALEdit edit,
-      final boolean writeToWAL) throws IOException {
+      final Durability durability) throws IOException {
     // no index updates, so we are done
     if (indexUpdates == null || indexUpdates.size() == 0) {
       return false;
@@ -449,7 +439,7 @@ public class Indexer extends BaseRegionObserver {
 
     // if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index
     // update right away
-    if (!writeToWAL) {
+    if (durability == Durability.SKIP_WAL) {
       try {
         this.writer.write(indexUpdates);
         return false;
@@ -469,27 +459,27 @@ public class Indexer extends BaseRegionObserver {
 
   @Override
   public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
-      boolean writeToWAL) throws IOException {
+      final Durability durability) throws IOException {
       if (this.disabled) {
-          super.postPut(e, put, edit, writeToWAL);
+      super.postPut(e, put, edit, durability);
           return;
         }
-    doPost(edit, put, writeToWAL);
+    doPost(edit, put, durability);
   }
 
   @Override
   public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
-      WALEdit edit, boolean writeToWAL) throws IOException {
+      WALEdit edit, final Durability durability) throws IOException {
       if (this.disabled) {
-          super.postDelete(e, delete, edit, writeToWAL);
+      super.postDelete(e, delete, edit, durability);
           return;
         }
-    doPost(edit,delete, writeToWAL);
+    doPost(edit, delete, durability);
   }
 
   @Override
   public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
       if (this.disabled) {
           super.postBatchMutate(c, miniBatchOp);
           return;
@@ -498,9 +488,9 @@ public class Indexer extends BaseRegionObserver {
     // noop for the rest of the indexer - its handled by the first call to put/delete
   }
 
-  private void doPost(WALEdit edit, Mutation m, boolean writeToWAL) throws IOException {
+  private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
     try {
-      doPostWithExceptions(edit, m, writeToWAL);
+      doPostWithExceptions(edit, m, durability);
       return;
     } catch (Throwable e) {
       rethrowIndexingException(e);
@@ -509,9 +499,10 @@ public class Indexer extends BaseRegionObserver {
         "Somehow didn't complete the index update, but didn't return succesfully either!");
   }
 
-  private void doPostWithExceptions(WALEdit edit, Mutation m, boolean writeToWAL) throws Exception {
+  private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability)
+      throws Exception {
     //short circuit, if we don't need to do any work
-    if (!writeToWAL || !this.builder.isEnabled(m)) {
+    if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
       // already did the index update in prePut, so we are done
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java
index bbeae31..8c9a777 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java
@@ -54,12 +54,12 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
   }
 
   @Override
-  public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
     // noop
   }
 
   @Override
-  public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) {
+  public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
     // noop
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java
index 833f142..61fc90d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java
@@ -116,7 +116,7 @@ public class IndexBuildManager implements Stoppable {
 
 
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
-      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp,
+      MiniBatchOperationInProgress<Mutation> miniBatchOp,
       Collection<? extends Mutation> mutations) throws Throwable {
     // notify the delegate that we have started processing a batch
     this.delegate.batchStarted(miniBatchOp);
@@ -178,11 +178,11 @@ public class IndexBuildManager implements Stoppable {
     return delegate.getIndexUpdateForFilteredRows(filtered);
   }
 
-  public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) {
+  public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
     delegate.batchCompleted(miniBatchOp);
   }
 
-  public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp)
+  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp)
       throws IOException {
     delegate.batchStarted(miniBatchOp);
   }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java
index e23ea3f..e92edab 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java
@@ -107,7 +107,7 @@ public interface IndexBuilder extends Stoppable {
    * Notification that a batch of updates has successfully been written.
    * @param miniBatchOp the full batch operation that was written
    */
-  public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp);
+  public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp);
 
   /**
    * Notification that a batch has been started.
@@ -118,7 +118,7 @@ public interface IndexBuilder extends Stoppable {
    * @param miniBatchOp the full batch operation to be written
  * @throws IOException 
    */
-  public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
 
   /**
    * This allows the codec to dynamically change whether or not indexing should take place for a

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java
index 422a9ec..ce5efc5 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java
@@ -33,23 +33,25 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
 import org.apache.hadoop.hbase.index.builder.BaseIndexBuilder;
 import org.apache.hadoop.hbase.index.covered.data.LocalHBaseState;
 import org.apache.hadoop.hbase.index.covered.data.LocalTable;
 import org.apache.hadoop.hbase.index.covered.update.ColumnTracker;
 import org.apache.hadoop.hbase.index.covered.update.IndexUpdateManager;
 import org.apache.hadoop.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
 
 /**
  * Build covered indexes for phoenix updates.
@@ -148,8 +150,9 @@ public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
    */
   protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
     Map<Long, Batch> batches = new HashMap<Long, Batch>();
-    for (List<KeyValue> family : m.getFamilyMap().values()) {
-      createTimestampBatchesFromKeyValues(family, batches);
+    for (List<Cell> family : m.getFamilyCellMap().values()) {
+      List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
+      createTimestampBatchesFromKeyValues(familyKVs, batches);
     }
     // sort the batches
     List<Batch> sorted = new ArrayList<Batch>(batches.values());
@@ -420,7 +423,7 @@ public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
 
     // We have to figure out which kind of delete it is, since we need to do different things if its
     // a general (row) delete, versus a delete of just a single column or family
-    Map<byte[], List<KeyValue>> families = d.getFamilyMap();
+    Map<byte[], List<Cell>> families = d.getFamilyCellMap();
 
     /*
      * Option 1: its a row delete marker, so we just need to delete the most recent state for each

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java
index f6419f2..ce21135 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java
@@ -28,13 +28,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.util.Pair;
-
 import org.apache.hadoop.hbase.index.covered.data.IndexMemStore;
 import org.apache.hadoop.hbase.index.covered.data.LocalHBaseState;
 import org.apache.hadoop.hbase.index.covered.update.ColumnReference;
@@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.index.covered.update.ColumnTracker;
 import org.apache.hadoop.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.hadoop.hbase.index.scanner.Scanner;
 import org.apache.hadoop.hbase.index.scanner.ScannerBuilder;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Manage the state of the HRegion's view of the table, for the single row.
@@ -175,7 +176,7 @@ public class LocalTableState implements TableState {
 
   public Result getCurrentRowState() {
     KeyValueScanner scanner = this.memstore.getScanner();
-    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    List<Cell> kvs = new ArrayList<Cell>();
     while (scanner.peek() != null) {
       try {
         kvs.add(scanner.next());
@@ -184,7 +185,7 @@ public class LocalTableState implements TableState {
         throw new RuntimeException("Local MemStore threw IOException!");
       }
     }
-    return new Result(kvs);
+    return Result.create(kvs);
   }
 
   /**
@@ -192,8 +193,8 @@ public class LocalTableState implements TableState {
    * @param pendingUpdate update to apply
    */
   public void addUpdateForTesting(Mutation pendingUpdate) {
-    for (Map.Entry<byte[], List<KeyValue>> e : pendingUpdate.getFamilyMap().entrySet()) {
-      List<KeyValue> edits = e.getValue();
+    for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
+      List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
       addUpdate(edits);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java
index e2cac10..5cee11a 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.index.covered.data;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.SortedSet;
@@ -26,7 +27,7 @@ import java.util.SortedSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.IndexKeyValueSkipListSet;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -87,12 +88,12 @@ public class IndexMemStore implements KeyValueStore {
    */
   public static final Comparator<KeyValue> COMPARATOR = new Comparator<KeyValue>() {
 
-    private final KeyComparator rawcomparator = new KeyComparator();
+    private final KVComparator rawcomparator = new KVComparator();
 
     @Override
     public int compare(final KeyValue left, final KeyValue right) {
-      return rawcomparator.compare(left.getBuffer(), left.getOffset() + KeyValue.ROW_OFFSET,
-        left.getKeyLength(), right.getBuffer(), right.getOffset() + KeyValue.ROW_OFFSET,
+      return rawcomparator.compareFlatKey(left.getRowArray(), left.getOffset() + KeyValue.ROW_OFFSET,
+        left.getKeyLength(), right.getRowArray(), right.getOffset() + KeyValue.ROW_OFFSET,
         right.getKeyLength());
     }
   };
@@ -140,7 +141,8 @@ public class IndexMemStore implements KeyValueStore {
   }
 
   private String toString(KeyValue kv) {
-    return kv.toString() + "/value=" + Bytes.toString(kv.getValue());
+    return kv.toString() + "/value=" + 
+        Bytes.toString(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
   }
 
   @Override
@@ -159,7 +161,7 @@ public class IndexMemStore implements KeyValueStore {
   public KeyValueScanner getScanner() {
     return new MemStoreScanner();
   }
-
+  
   /*
    * MemStoreScanner implements the KeyValueScanner. It lets the caller scan the contents of a
    * memstore -- both current map and snapshot. This behaves as if it were a real scanner but does
@@ -306,14 +308,13 @@ public class IndexMemStore implements KeyValueStore {
     public long getSequenceID() {
       return Long.MAX_VALUE;
     }
-
+    
     @Override
     public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
       throw new UnsupportedOperationException(this.getClass().getName()
           + " doesn't support checking to see if it should use a scanner!");
     }
 
-    /*
     @Override
     public boolean backwardSeek(KeyValue arg0) throws IOException {
         throw new UnsupportedOperationException();
@@ -328,6 +329,5 @@ public class IndexMemStore implements KeyValueStore {
     public boolean seekToPreviousRow(KeyValue arg0) throws IOException {
         throw new UnsupportedOperationException();
     }
-    */
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java
index 52aa851..d2d99e6 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
@@ -63,11 +64,11 @@ public class LocalTable implements LocalHBaseState {
     s.setStopRow(row);
     HRegion region = this.env.getRegion();
     RegionScanner scanner = region.getScanner(s);
-    List<KeyValue> kvs = new ArrayList<KeyValue>(1);
+    List<Cell> kvs = new ArrayList<Cell>(1);
     boolean more = scanner.next(kvs);
     assert !more : "Got more than one result when scanning" + " a single row in the primary table!";
 
-    Result r = new Result(kvs);
+    Result r = Result.create(kvs);
     scanner.close();
     return r;
   }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 8f0ee99..c588e95 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -244,7 +244,6 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
    * @param pk primary key of the original row
    * @param length total number of bytes of all the values that should be added
    * @param values to use when building the key
-   * @return
    */
   static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
     // now build up expected row key, each of the values, in order, followed by the PK and then some

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java
index c7019c4..0ec3f96 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -14,13 +14,12 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-
 import org.apache.hadoop.hbase.index.covered.Batch;
 import org.apache.hadoop.hbase.index.covered.CoveredColumnsIndexBuilder;
 import org.apache.hadoop.hbase.index.covered.LocalTableState;
 import org.apache.hadoop.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Index maintainer that maintains multiple indexes based on '{@link ColumnGroup}s'. Each group is a
@@ -108,10 +107,11 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
     Collection<Batch> batches = batchByRow(filtered);
 
     for (Batch batch : batches) {
-      Put p = new Put(batch.getKvs().iterator().next().getRow());
+      KeyValue curKV = batch.getKvs().iterator().next();
+      Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), curKV.getRowLength());
       for (KeyValue kv : batch.getKvs()) {
         // we only need to cleanup Put entries
-        byte type = kv.getType();
+        byte type = kv.getTypeByte();
         Type t = KeyValue.Type.codeToType(type);
         if (!t.equals(Type.Put)) {
           continue;
@@ -136,7 +136,6 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
 
   /**
    * @param filtered
-   * @return
    */
   private Collection<Batch>  batchByRow(Collection<KeyValue> filtered) {
     Map<Long, Batch> batches = new HashMap<Long, Batch>();

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
index ebd2abe..658e981 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java
@@ -27,9 +27,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.filter.FilterBase;
 
@@ -105,13 +108,14 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
   }
 
   @Override
-  public ReturnCode filterKeyValue(KeyValue next) {
+  public ReturnCode filterKeyValue(Cell next) {
     // we marked ourselves done, but the END_ROW_KEY didn't manage to seek to the very last key
     if (this.done) {
       return ReturnCode.SKIP;
     }
 
-    switch (KeyValue.Type.codeToType(next.getType())) {
+    KeyValue nextKV = KeyValueUtil.ensureKeyValue(next);
+    switch (KeyValue.Type.codeToType(next.getTypeByte())) {
     /*
      * DeleteFamily will always sort first because those KVs (we assume) don't have qualifiers (or
      * rather are null). Therefore, we have to keep a hold of all the delete families until we get
@@ -123,20 +127,20 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
       // one. In fact, it means that all the previous deletes can be ignored because the family must
       // not match anymore.
       this.coveringDelete.reset();
-      this.coveringDelete.deleteFamily = next;
+      this.coveringDelete.deleteFamily = nextKV;
       return ReturnCode.SKIP;
     case DeleteColumn:
       // similar to deleteFamily, all the newer deletes/puts would have been seen at this point, so
       // we can safely replace the more recent delete column with the more recent one
       this.coveringDelete.pointDelete = null;
-      this.coveringDelete.deleteColumn = next;
+      this.coveringDelete.deleteColumn = nextKV;
       return ReturnCode.SKIP;
     case Delete:
       // we are just deleting the single column value at this point.
       // therefore we just skip this entry and go onto the next one. The only caveat is that
       // we should still cover the next entry if this delete applies to the next entry, so we
       // have to keep around a reference to the KV to compare against the next valid entry
-      this.coveringDelete.pointDelete = next;
+      this.coveringDelete.pointDelete = nextKV;
       return ReturnCode.SKIP;
     default:
       // no covering deletes
@@ -144,18 +148,18 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
         return ReturnCode.INCLUDE;
       }
 
-      if (coveringDelete.matchesFamily(next)) {
+      if (coveringDelete.matchesFamily(nextKV)) {
         this.currentHint = familyHint;
         return ReturnCode.SEEK_NEXT_USING_HINT;
       }
 
-      if (coveringDelete.matchesColumn(next)) {
+      if (coveringDelete.matchesColumn(nextKV)) {
         // hint to the next column
         this.currentHint = columnHint;
         return ReturnCode.SEEK_NEXT_USING_HINT;
       }
 
-      if (coveringDelete.matchesPoint(next)) {
+      if (coveringDelete.matchesPoint(nextKV)) {
         return ReturnCode.SKIP;
       }
 
@@ -165,16 +169,6 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
     return ReturnCode.INCLUDE;
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
-  }
-
   /**
    * Get the next hint for a given peeked keyvalue
    */
@@ -247,7 +241,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
       if (deleteFamily == null) {
         return false;
       }
-      if (deleteFamily.matchingFamily(next)) {
+      if (CellUtil.matchingFamily(deleteFamily, next)) {
         // falls within the timestamp range
         if (deleteFamily.getTimestamp() >= next.getTimestamp()) {
           return true;
@@ -269,7 +263,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
       if (deleteColumn == null) {
         return false;
       }
-      if (deleteColumn.matchingFamily(next) && deleteColumn.matchingQualifier(next)) {
+      if (CellUtil.matchingFamily(deleteColumn, next) && deleteColumn.matchingQualifier(next)) {
         // falls within the timestamp range
         if (deleteColumn.getTimestamp() >= next.getTimestamp()) {
           return true;
@@ -289,7 +283,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase {
       // that the timestamp matches exactly. Because we sort by timestamp first, either the next
       // keyvalue has the exact timestamp or is an older (smaller) timestamp, and we can allow that
       // one.
-      if (pointDelete != null && pointDelete.matchingFamily(next)
+      if (pointDelete != null && CellUtil.matchingFamily(pointDelete, next)
           && pointDelete.matchingQualifier(next)) {
         if (pointDelete.getTimestamp() == next.getTimestamp()) {
           return true;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
index 72a10e1..494bf66 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
@@ -19,10 +19,7 @@
  */
 package org.apache.hadoop.hbase.index.covered.filter;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.FilterBase;
 
@@ -53,7 +50,7 @@ public class ColumnTrackingNextLargestTimestampFilter extends FilterBase {
   }
 
   @Override
-  public ReturnCode filterKeyValue(KeyValue v) {
+  public ReturnCode filterKeyValue(Cell v) {
     long timestamp = v.getTimestamp();
     if (timestamp > ts) {
       this.column.setTs(timestamp);
@@ -62,13 +59,4 @@ public class ColumnTrackingNextLargestTimestampFilter extends FilterBase {
     return ReturnCode.INCLUDE;
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java
index 8591f88..7c35786 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.hbase.index.covered.filter;
 
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
 
 /**
  * Similar to the {@link FamilyFilter} but stops when the end of the family is reached and only
@@ -39,7 +39,7 @@ public class FamilyOnlyFilter extends FamilyFilter {
     this(new BinaryComparator(family));
   }
 
-  public FamilyOnlyFilter(final WritableByteArrayComparable familyComparator) {
+  public FamilyOnlyFilter(final ByteArrayComparable familyComparator) {
     super(CompareOp.EQUAL, familyComparator);
   }
 
@@ -56,7 +56,7 @@ public class FamilyOnlyFilter extends FamilyFilter {
   }
 
   @Override
-  public ReturnCode filterKeyValue(KeyValue v) {
+  public ReturnCode filterKeyValue(Cell v) {
     if (done) {
       return ReturnCode.SKIP;
     }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java
index 846ec88..92e9daf 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java
@@ -23,6 +23,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -44,7 +45,13 @@ public class MaxTimestampFilter extends FilterBase {
   public KeyValue getNextKeyHint(KeyValue currentKV) {
     // this might be a little excessive right now - better safe than sorry though, so we don't mess
     // with other filters too much.
-    KeyValue kv = currentKV.deepCopy();
+    KeyValue kv = null;
+    try {
+        kv = currentKV.clone();
+    } catch (CloneNotSupportedException e) {
+        // the exception should not happen at all
+        throw new IllegalArgumentException(e);
+    }
     int offset =kv.getTimestampOffset();
     //set the timestamp in the buffer
     byte[] buffer = kv.getBuffer();
@@ -55,22 +62,11 @@ public class MaxTimestampFilter extends FilterBase {
   }
 
   @Override
-  public ReturnCode filterKeyValue(KeyValue v) {
+  public ReturnCode filterKeyValue(Cell v) {
     long timestamp = v.getTimestamp();
     if (timestamp > ts) {
       return ReturnCode.SEEK_NEXT_USING_HINT;
     }
     return ReturnCode.INCLUDE;
   }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
-
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java
index 560cdd8..8e0f617 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java
@@ -4,6 +4,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.FilterBase;
 
@@ -11,7 +12,6 @@ import org.apache.hadoop.hbase.filter.FilterBase;
  * Server-side only class used in the indexer to filter out keyvalues newer than a given timestamp
  * (so allows anything <code><=</code> timestamp through).
  * <p>
- * Note,<tt>this</tt> doesn't support {@link #write(DataOutput)} or {@link #readFields(DataInput)}.
  */
 public class NewerTimestampFilter extends FilterBase {
 
@@ -22,16 +22,8 @@ public class NewerTimestampFilter extends FilterBase {
   }
 
   @Override
-  public ReturnCode filterKeyValue(KeyValue ignored) {
+  public ReturnCode filterKeyValue(Cell ignored) {
     return ignored.getTimestamp() > timestamp ? ReturnCode.SKIP : ReturnCode.INCLUDE;
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    throw new UnsupportedOperationException("TimestampFilter is server-side only!");
-  }
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    throw new UnsupportedOperationException("TimestampFilter is server-side only!");
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java
index 7ace79d..77b6e85 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -224,12 +224,13 @@ public class IndexUpdateManager {
             + ((m instanceof Put) ? m.getTimeStamp() + " " : ""));
         sb.append(" row=" + Bytes.toString(m.getRow()));
         sb.append("\n");
-        if (m.getFamilyMap().isEmpty()) {
+        if (m.getFamilyCellMap().isEmpty()) {
           sb.append("\t\t=== EMPTY ===\n");
         }
-        for (List<KeyValue> kvs : m.getFamilyMap().values()) {
-          for (KeyValue kv : kvs) {
-            sb.append("\t\t" + kv.toString() + "/value=" + Bytes.toStringBinary(kv.getValue()));
+        for (List<Cell> kvs : m.getFamilyCellMap().values()) {
+          for (Cell kv : kvs) {
+            sb.append("\t\t" + kv.toString() + "/value=" + Bytes.toStringBinary(kv.getValueArray(), 
+            		kv.getValueOffset(), kv.getValueLength()));
             sb.append("\n");
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java
index c8afb04..2a343f0 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java
@@ -74,7 +74,6 @@ public class ThreadPoolManager {
 
   /**
    * @param conf
-   * @return
    */
   private static ShutdownOnUnusedThreadPoolExecutor getDefaultExecutor(ThreadPoolBuilder builder) {
     int maxThreads = builder.getMaxThreads();

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java
index 0f7fed3..f3deb6a 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java
@@ -125,7 +125,6 @@ public class FilteredKeyValueScanner implements KeyValueScanner {
         this.delegate.close();
     }
 
-    /*
     @Override
     public boolean backwardSeek(KeyValue arg0) throws IOException {
         return this.delegate.backwardSeek(arg0);
@@ -140,5 +139,4 @@ public class FilteredKeyValueScanner implements KeyValueScanner {
     public boolean seekToPreviousRow(KeyValue arg0) throws IOException {
         return this.delegate.seekToPreviousRow(arg0);
     }
-    */
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java
index 5ded879..2af2c7d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java
@@ -7,6 +7,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -40,7 +41,7 @@ public class CoprocessorHTableFactory implements HTableFactory {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary()));
     }
-    return this.e.getTable(tablename.copyBytesIfNecessary());
+    return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java
index 9ee81a9..16b3584 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 
 import com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.index.ValueGetter;
@@ -65,7 +66,8 @@ public class IndexManagementUtil {
         } catch (Throwable t) {
             return false;
         }
-        if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf.get(WAL_EDIT_CODEC_CLASS_KEY, null))) {
+    if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf
+        .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, null))) {
             // its installed, and it can handle compression and non-compression cases
             return true;
         }
@@ -91,7 +93,8 @@ public class IndexManagementUtil {
         if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) {
             if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException(
                     "WAL Compression is only supported with " + codecClass
-                            + ". You can install in hbase-site.xml, under " + WAL_EDIT_CODEC_CLASS_KEY); }
+            + ". You can install in hbase-site.xml, under " + WALCellCodec.WAL_CELL_CODEC_CLASS_KEY);
+      }
         } else {
             throw new IllegalStateException(codecClass + " is not installed, but "
                     + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java
index a7f4e82..5b2c6b4 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java
@@ -8,8 +8,13 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
@@ -46,12 +51,20 @@ public class IndexedKeyValue extends KeyValue {
     }
 
     /**
-     * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link HLog#METAFAMILY} so it
+     * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link WALEdit#METAFAMILY} so it
      * isn't replayed via the normal replay mechanism
      */
     @Override
     public boolean matchingFamily(final byte[] family) {
-        return Bytes.equals(family, HLog.METAFAMILY);
+        return Bytes.equals(family, WALEdit.METAFAMILY);
+    }
+    
+    /**
+     * Not a real KeyValue
+     */
+    @Override
+    public boolean matchingRow(final byte [] row) {
+        return false;
     }
 
     @Override
@@ -77,22 +90,11 @@ public class IndexedKeyValue extends KeyValue {
     }
 
     private byte[] getMutationBytes() {
-        ByteArrayOutputStream bos = null;
         try {
-            bos = new ByteArrayOutputStream();
-            this.mutation.write(new DataOutputStream(bos));
-            bos.flush();
-            return bos.toByteArray();
+            MutationProto m = toMutationProto(this.mutation);
+            return m.toByteArray();
         } catch (IOException e) {
             throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
-        } finally {
-            if (bos != null) {
-                try {
-                    bos.close();
-                } catch (IOException e) {
-                    throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
-                }
-            }
         }
     }
 
@@ -101,11 +103,6 @@ public class IndexedKeyValue extends KeyValue {
         return hashCode;
     }
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        KeyValueCodec.write(out, this);
-    }
-
     /**
      * Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done
      * via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of
@@ -118,8 +115,8 @@ public class IndexedKeyValue extends KeyValue {
      */
     void writeData(DataOutput out) throws IOException {
         Bytes.writeByteArray(out, this.indexTableName.get());
-        out.writeUTF(this.mutation.getClass().getName());
-        this.mutation.write(out);
+        MutationProto m = toMutationProto(this.mutation);
+        Bytes.writeByteArray(out, m.toByteArray());
     }
 
     /**
@@ -127,22 +124,12 @@ public class IndexedKeyValue extends KeyValue {
      * complement to {@link #writeData(DataOutput)}.
      */
     @SuppressWarnings("javadoc")
-    @Override
     public void readFields(DataInput in) throws IOException {
         this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
-        Class<? extends Mutation> clazz;
-        try {
-            clazz = Class.forName(in.readUTF()).asSubclass(Mutation.class);
-            this.mutation = clazz.newInstance();
-            this.mutation.readFields(in);
-            this.hashCode = calcHashCode(indexTableName, mutation);
-        } catch (ClassNotFoundException e) {
-            throw new IOException(e);
-        } catch (InstantiationException e) {
-            throw new IOException(e);
-        } catch (IllegalAccessException e) {
-            throw new IOException(e);
-        }
+        byte[] mutationData = Bytes.readByteArray(in);
+        MutationProto mProto = MutationProto.parseFrom(mutationData);
+        this.mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto);
+        this.hashCode = calcHashCode(indexTableName, mutation);
     }
 
     public boolean getBatchFinished() {
@@ -152,4 +139,18 @@ public class IndexedKeyValue extends KeyValue {
     public void markBatchFinished() {
         this.batchFinished = true;
     }
+    
+    protected MutationProto toMutationProto(Mutation mutation)  throws IOException {
+        MutationProto m = null;
+        if(mutation instanceof Put){
+            m = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(MutationType.PUT, 
+                mutation);
+        } else if(mutation instanceof Delete) {
+            m = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(MutationType.DELETE, 
+                mutation);
+        } else {
+            throw new IOException("Put/Delete mutations only supported");
+        }
+        return m;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java
index 0abdf8d..3340edc 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java
@@ -7,6 +7,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
@@ -49,16 +50,14 @@ public class KeyValueCodec {
    */
   public static KeyValue readKeyValue(DataInput in) throws IOException {
     int length = in.readInt();
-    KeyValue kv;
     // its a special IndexedKeyValue
     if (length == INDEX_TYPE_LENGTH_MARKER) {
-      kv = new IndexedKeyValue();
+      IndexedKeyValue kv = new IndexedKeyValue();
       kv.readFields(in);
+      return kv;
     } else {
-      kv = new KeyValue();
-      kv.readFields(length, in);
+      return KeyValue.create(length, in);
     }
-    return kv;
   }
 
   /**
@@ -73,7 +72,7 @@ public class KeyValueCodec {
       out.writeInt(INDEX_TYPE_LENGTH_MARKER);
       ((IndexedKeyValue) kv).writeData(out);
     } else {
-      kv.write(out);
+        KeyValue.write(kv, out);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
index bad82c4..dcef90a 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
@@ -8,6 +8,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.io.Writable;
@@ -31,122 +32,12 @@ import org.apache.hadoop.io.Writable;
  * we need to track which of the regions were on the server when it crashed only only split those
  * edits out into their respective regions.
  */
-public class IndexedHLogReader implements Reader {
+public class IndexedHLogReader extends ProtobufLogReader {
   private static final Log LOG = LogFactory.getLog(IndexedHLogReader.class);
 
-  private SequenceFileLogReader delegate;
-
-
-  private static class IndexedWALReader extends SequenceFileLogReader.WALReader {
-
-    /**
-     * @param fs
-     * @param p
-     * @param c
-     * @throws IOException
-     */
-    IndexedWALReader(FileSystem fs, Path p, Configuration c) throws IOException {
-      super(fs, p, c);
-    }
-
-    /**
-     * we basically have to reproduce what the SequenceFile.Reader is doing in next(), but without
-     * the check on the value class, since we have a special value class that doesn't directly match
-     * what was specified in the file header
-     */
-    @Override
-    public synchronized boolean next(Writable key, Writable val) throws IOException {
-      boolean more = next(key);
-
-      if (more) {
-        getCurrentValue(val);
-      }
-
-      return more;
-    }
-
-  }
-
-  public IndexedHLogReader() {
-    this.delegate = new SequenceFileLogReader();
-  }
-
-  @Override
-  public void init(final FileSystem fs, final Path path, Configuration conf) throws IOException {
-    this.delegate.init(fs, path, conf);
-    // close the old reader and replace with our own, custom one
-    this.delegate.reader.close();
-    this.delegate.reader = new IndexedWALReader(fs, path, conf);
-    Exception e = new Exception();
-    LOG.info("Instantiated indexed log reader." + Arrays.toString(e.getStackTrace()));
-    LOG.info("Got conf: " + conf);
-  }
-
-  @Override
-  public void close() throws IOException {
-    this.delegate.close();
-  }
-
-  @Override
-  public Entry next() throws IOException {
-    return next(null);
-  }
-
-  @Override
-  public Entry next(Entry reuse) throws IOException {
-    delegate.entryStart = delegate.reader.getPosition();
-    HLog.Entry e = reuse;
-    if (e == null) {
-      HLogKey key;
-      if (delegate.keyClass == null) {
-        key = HLog.newKey(delegate.conf);
-      } else {
-        try {
-          key = delegate.keyClass.newInstance();
-        } catch (InstantiationException ie) {
-          throw new IOException(ie);
-        } catch (IllegalAccessException iae) {
-          throw new IOException(iae);
-        }
-      }
-      WALEdit val = new WALEdit();
-      e = new HLog.Entry(key, val);
-    }
-
-    // now read in the HLog.Entry from the WAL
-    boolean nextPairValid = false;
-    try {
-      if (delegate.compressionContext != null) {
-        throw new UnsupportedOperationException(
-            "Reading compression isn't supported with the IndexedHLogReader! Compresed WALEdits "
-                + "are only support for HBase 0.94.9+ and with the IndexedWALEditCodec!");
-      }
-      // this is the special bit - we use our custom entry to read in the key-values that have index
-      // information, but otherwise it looks just like a regular WALEdit
-      IndexedWALEdit edit = new IndexedWALEdit(e.getEdit());
-      nextPairValid = delegate.reader.next(e.getKey(), edit);
-    } catch (IOException ioe) {
-      throw delegate.addFileInfoToException(ioe);
-    }
-    delegate.edit++;
-    if (delegate.compressionContext != null && delegate.emptyCompressionContext) {
-      delegate.emptyCompressionContext = false;
-    }
-    return nextPairValid ? e : null;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    this.delegate.seek(pos);
-  }
-
-  @Override
-  public long getPosition() throws IOException {
-    return this.delegate.getPosition();
-  }
-
   @Override
-  public void reset() throws IOException {
-    this.delegate.reset();
+  protected void initAfterCompression() throws IOException {
+      conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName());
+      super.initAfterCompression();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
deleted file mode 100644
index 6749cc9..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
-
-/**
- * Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader
- * <p>
- * This class should only be used with HBase &lt; 0.94.9. Newer installations of HBase should
- * instead use the IndexedWALEditCodec along with the correct configuration options.
- */
-public class IndexedWALEdit extends WALEdit {
-  //reproduced here so we don't need to modify the HBase source.
-  private static final int VERSION_2 = -1;
-  private WALEdit delegate;
-
-  /**
-   * Copy-constructor. Only does a surface copy of the delegates fields - no actual data is copied, only referenced.
-   * @param delegate to copy
-   */
-  @SuppressWarnings("deprecation")
-  public IndexedWALEdit(WALEdit delegate) {
-    this.delegate = delegate;
-    // reset the delegate's fields
-    this.delegate.getKeyValues().clear();
-    if (this.delegate.getScopes() != null) {
-      this.delegate.getScopes().clear();
-    }
-  }
-
-  public IndexedWALEdit() {
-
-  }
-
-  @Override
-public void setCompressionContext(CompressionContext context) {
-    throw new UnsupportedOperationException(
-        "Compression not supported for IndexedWALEdit! If you are using HBase 0.94.9+, use IndexedWALEditCodec instead.");
-  }
-
-  @SuppressWarnings("deprecation")
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    delegate.getKeyValues().clear();
-    if (delegate.getScopes() != null) {
-      delegate.getScopes().clear();
-    }
-    // ----------------------------------------------------------------------------------------
-    // no compression, so we do pretty much what the usual WALEdit does, plus a little magic to
-    // capture the index updates
-    // -----------------------------------------------------------------------------------------
-    int versionOrLength = in.readInt();
-    if (versionOrLength != VERSION_2) {
-      throw new IOException("You must update your cluster to the lastest version of HBase and"
-          + " clean out all logs (cleanly start and then shutdown) before enabling indexing!");
-    }
-    // this is new style HLog entry containing multiple KeyValues.
-    List<KeyValue> kvs = KeyValueCodec.readKeyValues(in);
-    delegate.getKeyValues().addAll(kvs);
-
-    // then read in the rest of the WALEdit
-    int numFamilies = in.readInt();
-    NavigableMap<byte[], Integer> scopes = delegate.getScopes();
-    if (numFamilies > 0) {
-      if (scopes == null) {
-        scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-      }
-      for (int i = 0; i < numFamilies; i++) {
-        byte[] fam = Bytes.readByteArray(in);
-        int scope = in.readInt();
-        scopes.put(fam, scope);
-      }
-      delegate.setScopes(scopes);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    throw new IOException(
-        "Indexed WALEdits aren't written directly out - use IndexedKeyValues instead");
-  }
-}
\ No newline at end of file