You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/11/23 11:43:54 UTC

phoenix git commit: PHOENIX-4376 Some more fixes for Coprocessor

Repository: phoenix
Updated Branches:
  refs/heads/5.x-HBase-2.0 6db352aa3 -> 2a9a9f090


PHOENIX-4376 Some more fixes for Coprocessor


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a9a9f09
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a9a9f09
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a9a9f09

Branch: refs/heads/5.x-HBase-2.0
Commit: 2a9a9f0908660ba67cb05637269acbdd293bda53
Parents: 6db352a
Author: Ankit Singhal <an...@gmail.com>
Authored: Thu Nov 23 17:13:44 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Thu Nov 23 17:13:44 2017 +0530

----------------------------------------------------------------------
 .../phoenix/end2end/FlappingLocalIndexIT.java   | 20 +++++++-------------
 .../phoenix/end2end/join/HashJoinCacheIT.java   |  5 +----
 .../DelegateRegionCoprocessorEnvironment.java   | 16 ++--------------
 .../coprocessor/DelegateRegionObserver.java     |  6 ------
 .../coprocessor/MetaDataEndpointImpl.java       | 11 +++++------
 .../coprocessor/SequenceRegionObserver.java     |  8 +++-----
 .../UngroupedAggregateRegionObserver.java       | 19 +------------------
 .../org/apache/phoenix/hbase/index/Indexer.java | 15 ++++++++-------
 .../hbase/index/builder/IndexBuildManager.java  |  8 ++++----
 .../index/table/CoprocessorHTableFactory.java   |  1 -
 .../write/ParallelWriterIndexCommitter.java     |  5 ++---
 .../TrackingParallelWriterIndexCommitter.java   | 13 ++++++-------
 .../index/PhoenixTransactionalIndexer.java      |  5 +----
 .../apache/phoenix/util/PhoenixMRJobUtil.java   |  6 +++---
 .../index/write/TestWALRecoveryCaching.java     | 14 +++++++-------
 15 files changed, 50 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
index 517cd6a..7e769ba 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
@@ -36,11 +36,8 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.end2end.index.BaseLocalIndexIT;
 import org.apache.phoenix.query.QueryConstants;
@@ -347,14 +344,13 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT {
         assertEquals(4, rs.getInt(1));
     }
 
-    public static class DeleyOpenRegionObserver extends BaseRegionObserver {
+    public static class DeleyOpenRegionObserver implements RegionObserver {
         public static volatile boolean DELAY_OPEN = false;
         private int retryCount = 0;
         private CountDownLatch latch = new CountDownLatch(1);
         @Override
-        public void
-                preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
-                        throws IOException {
+        public void preClose(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
+                boolean abortRequested) throws IOException {
             if(DELAY_OPEN) {
                 try {
                     latch.await();
@@ -362,17 +358,15 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT {
                     throw new DoNotRetryIOException(e1);
                 }
             }
-            super.preClose(c, abortRequested);
         }
 
         @Override
-        public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
-                Scan scan, RegionScanner s) throws IOException {
-            if(DELAY_OPEN && retryCount == 1) {
+        public void preScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
+                Scan scan) throws IOException {
+            if (DELAY_OPEN && retryCount == 1) {
                 latch.countDown();
             }
             retryCount++;
-            return super.preScannerOpen(e, scan, s);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
index c49c61f..49488fb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
@@ -80,8 +79,7 @@ public class HashJoinCacheIT extends BaseJoinIT {
         public static Random rand= new Random();
         public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();
         @Override
-        public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
-                final RegionScanner s) throws IOException {
+        public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan) throws IOException {
             final HashJoinInfo joinInfo = HashJoinInfo.deserializeHashJoinFromScan(scan);
             if (joinInfo != null) {
                 TenantCache cache = GlobalCache.getTenantCache(c.getEnvironment(), null);
@@ -94,7 +92,6 @@ public class HashJoinCacheIT extends BaseJoinIT {
                     }
                 }
             }
-            return s;
         }
         
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
index 25281fe..00f3316 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
@@ -17,13 +17,10 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import java.io.IOException;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -98,16 +95,6 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn
     }
 
     @Override
-    public void startup() throws IOException {
-        delegate.startup();
-    }
-
-    @Override
-    public void shutdown() {
-        delegate.shutdown();
-    }
-
-    @Override
     public OnlineRegions getOnlineRegions() {
         return delegate.getOnlineRegions();
     }
@@ -126,5 +113,6 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn
     public MetricRegistry getMetricRegistryForRegionServer() {
         return delegate.getMetricRegistryForRegionServer();
     }
-
+   
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
index 8fcd68d..a65f78f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
@@ -68,12 +68,6 @@ public class DelegateRegionObserver implements RegionObserver {
     }
 
     @Override
-    public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
-        delegate.postLogReplay(c);
-    }
-
-
-    @Override
     public void preFlush(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
             org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker tracker) throws IOException {
         delegate.preFlush(c, tracker);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 63719cf..c7151ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -104,7 +104,6 @@ import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -120,7 +119,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -231,8 +230,8 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -266,7 +265,7 @@ import com.google.protobuf.Service;
  * @since 0.1
  */
 @SuppressWarnings("deprecation")
-public class MetaDataEndpointImpl extends MetaDataProtocol implements CoprocessorService, Coprocessor {
+public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCoprocessor {
     private static final Logger logger = LoggerFactory.getLogger(MetaDataEndpointImpl.class);
 
     // Column to track tables that have been upgraded based on PHOENIX-2067
@@ -497,8 +496,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     }
 
     @Override
-    public Service getService() {
-        return this;
+    public Iterable<Service> getServices() {
+        return Collections.singleton(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 68b36f5..3f7439a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -52,8 +52,8 @@ import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.SequenceUtil;
 import org.apache.phoenix.util.ServerUtil;
 
@@ -105,7 +105,6 @@ public class SequenceRegionObserver implements RegionObserver {
         RegionCoprocessorEnvironment env = e.getEnvironment();
         // We need to set this to prevent region.increment from being called
         e.bypass();
-        e.complete();
         Region region = env.getRegion();
         byte[] row = increment.getRow();
         List<RowLock> locks = Lists.newArrayList();
@@ -363,7 +362,6 @@ public class SequenceRegionObserver implements RegionObserver {
         RegionCoprocessorEnvironment env = e.getEnvironment();
         // We need to set this to prevent region.append from being called
         e.bypass();
-        e.complete();
         Region region = env.getRegion();
         byte[] row = append.getRow();
         List<RowLock> locks = Lists.newArrayList();
@@ -397,7 +395,7 @@ public class SequenceRegionObserver implements RegionObserver {
                     // Timestamp should match exactly, or we may have the wrong sequence
                     if (expectedValue != value || currentValueKV.getTimestamp() != clientTimestamp) {
                         return Result.create(Collections.singletonList(
-                          (Cell)PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, 
+                          PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, 
                             QueryConstants.EMPTY_COLUMN_BYTES, currentValueKV.getTimestamp(), ByteUtil.EMPTY_BYTE_ARRAY)));
                     }
                     m = new Put(row, currentValueKV.getTimestamp());
@@ -425,7 +423,7 @@ public class SequenceRegionObserver implements RegionObserver {
                 // the client cares about is the timestamp, which is the timestamp of
                 // when the mutation was actually performed (useful in the case of .
                 return Result.create(Collections.singletonList(
-                  (Cell)PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
+                  PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
             } finally {
                 ServerUtil.releaseRowLocks(locks);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 4e4e0f5..7f0be01 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -132,8 +132,8 @@ import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -1325,23 +1325,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-    /*
-     * TODO: use waitForFlushes PHOENIX-4352
-     */
-    @Override
-    public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
-            throws IOException {
-        // Don't allow splitting if operations need read and write to same region are going on in the
-        // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
-        synchronized (lock) {
-            isRegionClosingOrSplitting = true;
-            if (scansReferenceCount > 0) {
-                throw new IOException("Operations like local index building/delete/upsert select"
-                        + " might be going on so not allowing to split.");
-            }
-        }
-    }
-
     @Override
     public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c,
             List<Pair<byte[], String>> familyPaths) throws IOException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 5f4afe5..729b928 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -202,7 +202,7 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
   public void start(CoprocessorEnvironment e) throws IOException {
       try {
         final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
-        String serverName = env.getRegionServerServices().getServerName().getServerName();
+        String serverName = env.getServerName().getServerName();
         if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
           // make sure the right version <-> combinations are allowed.
           String errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration());
@@ -395,7 +395,7 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
           
       Durability defaultDurability = Durability.SYNC_WAL;
       if(c.getEnvironment().getRegion() != null) {
-          defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
+          defaultDurability = c.getEnvironment().getRegion().getTableDescriptor().getDurability();
           defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ? 
                   Durability.SYNC_WAL : defaultDurability;
       }
@@ -531,7 +531,7 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
           metricSource.updateIndexPrepareTime(duration);
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-          byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+          byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
           Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
           List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
           while(indexUpdatesItr.hasNext()) {
@@ -682,7 +682,6 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
     Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
     
     if (this.disabled) {
-        super.postOpen(c);
         return;
     }
 
@@ -717,10 +716,12 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
   }
 
   @Override
-  public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
-      HLogKey logKey, WALEdit logEdit) throws IOException {
+    public void preWALRestore(
+            org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
+            org.apache.hadoop.hbase.client.RegionInfo info, org.apache.hadoop.hbase.wal.WALKey logKey, WALEdit logEdit)
+            throws IOException {
+  
       if (this.disabled) {
-          super.preWALRestore(env, info, logKey, logEdit);
           return;
       }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 4c410ad..6450fd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -66,10 +66,10 @@ public class IndexBuildManager implements Stoppable {
       return builder;
     } catch (InstantiationException e1) {
       throw new IOException("Couldn't instantiate index builder:" + builderClass
-          + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
+          + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString());
     } catch (IllegalAccessException e1) {
       throw new IOException("Couldn't instantiate index builder:" + builderClass
-          + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
+          + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString());
     }
   }
 
@@ -90,7 +90,7 @@ public class IndexBuildManager implements Stoppable {
   }
 
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
-      Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException {
+      Collection<Cell> filtered, IndexMetaData indexMetaData) throws IOException {
     // this is run async, so we can take our time here
     return delegate.getIndexUpdateForFilteredRows(filtered, indexMetaData);
   }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index d626689..7ca43ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.hbase.index.table;
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index f78fdcf..ceac999 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Table;
@@ -77,7 +76,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
                 ThreadPoolManager.getExecutor(
                         new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
                                 DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
-                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), parent, env);
         this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
     }
 
@@ -86,7 +85,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
      * <p>
      * Exposed for TESTING
      */
-    void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) {
+    void setup(HTableFactory factory, ExecutorService pool,Stoppable stop, RegionCoprocessorEnvironment env) {
         this.factory = factory;
         this.pool = new QuickFailingTaskRunner(pool);
         this.stopped = stop;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index d9a9b21..f427646 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.hbase.index.CapturingAbortable;
 import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
@@ -74,7 +73,6 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
 
     private TaskRunner pool;
     private HTableFactory factory;
-    private CapturingAbortable abortable;
     private Stoppable stopped;
     private RegionCoprocessorEnvironment env;
     private KeyValueBuilder kvBuilder;
@@ -95,7 +93,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
                 ThreadPoolManager.getExecutor(
                         new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
                                 DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
-                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), parent, env);
         this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
     }
 
@@ -104,11 +102,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
      * <p>
      * Exposed for TESTING
      */
-    void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+    void setup(HTableFactory factory, ExecutorService pool, Stoppable stop,
             RegionCoprocessorEnvironment env) {
         this.pool = new WaitForCompletionTaskRunner(pool);
         this.factory = factory;
-        this.abortable = new CapturingAbortable(abortable);
         this.stopped = stop;
     }
 
@@ -192,8 +189,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
                 }
 
                 private void throwFailureIfDone() throws SingleIndexWriteFailureException {
-                    if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
-                            "Pool closed, not attempting to write to the index!", null); }
+                    if (stopped.isStopped() || env.getConnection() == null || env.getConnection().isClosed()
+                            || env.getConnection().isAborted()
+                            || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
+                                    "Pool closed, not attempting to write to the index!", null); }
 
                 }
             });

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 4641a8d..4064457 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
@@ -87,7 +86,6 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
 import org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -129,7 +127,7 @@ public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoproc
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
         final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
-        String serverName = env.getRegionServerServices().getServerName().getServerName();
+        String serverName = env.getServerName().getServerName();
         codec = new PhoenixIndexCodec();
         codec.initialize(env);
         // Clone the config since it is shared
@@ -192,7 +190,6 @@ public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoproc
 
         Mutation m = miniBatchOp.getOperation(0);
         if (!codec.isEnabled(m)) {
-            super.preBatchMutate(c, miniBatchOp);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
index f12d49d..74bbaad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
@@ -35,7 +35,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ActiveRMInfoProto;
@@ -84,11 +84,11 @@ public class PhoenixMRJobUtil {
     public static String getActiveResourceManagerHost(Configuration config, String zkQuorum)
             throws IOException, InterruptedException, JSONException, KeeperException,
             InvalidProtocolBufferException, ZooKeeperConnectionException {
-        ZooKeeperWatcher zkw = null;
+        ZKWatcher zkw = null;
         ZooKeeper zk = null;
         String activeRMHost = null;
         try {
-            zkw = new ZooKeeperWatcher(config, "get-active-yarnmanager", null);
+            zkw = new ZKWatcher(config, "get-active-yarnmanager", null);
             zk = new ZooKeeper(zkQuorum, 30000, zkw, false);
 
             List<String> children = zk.getChildren(YARN_LEADER_ELECTION, zkw);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index 4f1eea6..0d6ac7f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -48,16 +48,14 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.phoenix.hbase.index.IndexTableName;
 import org.apache.phoenix.hbase.index.IndexTestingUtils;
 import org.apache.phoenix.hbase.index.Indexer;
@@ -103,11 +101,13 @@ public class TestWALRecoveryCaching {
   // -----------------------------------------------------------------------------------------------
   private static CountDownLatch allowIndexTableToRecover;
 
-  public static class IndexTableBlockingReplayObserver extends BaseRegionObserver {
+  public static class IndexTableBlockingReplayObserver implements RegionObserver {
 
     @Override
-    public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
-        HLogKey logKey, WALEdit logEdit) throws IOException {
+        public void preWALRestore(
+                org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
+                org.apache.hadoop.hbase.client.RegionInfo info, WALKey logKey,
+                org.apache.hadoop.hbase.wal.WALEdit logEdit) throws IOException {
       try {
         LOG.debug("Restoring logs for index table");
         if (allowIndexTableToRecover != null) {