You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/11/23 11:43:54 UTC
phoenix git commit: PHOENIX-4376 Some more fixes for Coprocessor
Repository: phoenix
Updated Branches:
refs/heads/5.x-HBase-2.0 6db352aa3 -> 2a9a9f090
PHOENIX-4376 Some more fixes for Coprocessor
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a9a9f09
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a9a9f09
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a9a9f09
Branch: refs/heads/5.x-HBase-2.0
Commit: 2a9a9f0908660ba67cb05637269acbdd293bda53
Parents: 6db352a
Author: Ankit Singhal <an...@gmail.com>
Authored: Thu Nov 23 17:13:44 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Thu Nov 23 17:13:44 2017 +0530
----------------------------------------------------------------------
.../phoenix/end2end/FlappingLocalIndexIT.java | 20 +++++++-------------
.../phoenix/end2end/join/HashJoinCacheIT.java | 5 +----
.../DelegateRegionCoprocessorEnvironment.java | 16 ++--------------
.../coprocessor/DelegateRegionObserver.java | 6 ------
.../coprocessor/MetaDataEndpointImpl.java | 11 +++++------
.../coprocessor/SequenceRegionObserver.java | 8 +++-----
.../UngroupedAggregateRegionObserver.java | 19 +------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 15 ++++++++-------
.../hbase/index/builder/IndexBuildManager.java | 8 ++++----
.../index/table/CoprocessorHTableFactory.java | 1 -
.../write/ParallelWriterIndexCommitter.java | 5 ++---
.../TrackingParallelWriterIndexCommitter.java | 13 ++++++-------
.../index/PhoenixTransactionalIndexer.java | 5 +----
.../apache/phoenix/util/PhoenixMRJobUtil.java | 6 +++---
.../index/write/TestWALRecoveryCaching.java | 14 +++++++-------
15 files changed, 50 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
index 517cd6a..7e769ba 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
@@ -36,11 +36,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.end2end.index.BaseLocalIndexIT;
import org.apache.phoenix.query.QueryConstants;
@@ -347,14 +344,13 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT {
assertEquals(4, rs.getInt(1));
}
- public static class DeleyOpenRegionObserver extends BaseRegionObserver {
+ public static class DeleyOpenRegionObserver implements RegionObserver {
public static volatile boolean DELAY_OPEN = false;
private int retryCount = 0;
private CountDownLatch latch = new CountDownLatch(1);
@Override
- public void
- preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
- throws IOException {
+ public void preClose(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean abortRequested) throws IOException {
if(DELAY_OPEN) {
try {
latch.await();
@@ -362,17 +358,15 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT {
throw new DoNotRetryIOException(e1);
}
}
- super.preClose(c, abortRequested);
}
@Override
- public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
- Scan scan, RegionScanner s) throws IOException {
- if(DELAY_OPEN && retryCount == 1) {
+ public void preScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
+ Scan scan) throws IOException {
+ if (DELAY_OPEN && retryCount == 1) {
latch.countDown();
}
retryCount++;
- return super.preScannerOpen(e, scan, s);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
index c49c61f..49488fb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
@@ -80,8 +79,7 @@ public class HashJoinCacheIT extends BaseJoinIT {
public static Random rand= new Random();
public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();
@Override
- public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
- final RegionScanner s) throws IOException {
+ public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan) throws IOException {
final HashJoinInfo joinInfo = HashJoinInfo.deserializeHashJoinFromScan(scan);
if (joinInfo != null) {
TenantCache cache = GlobalCache.getTenantCache(c.getEnvironment(), null);
@@ -94,7 +92,6 @@ public class HashJoinCacheIT extends BaseJoinIT {
}
}
}
- return s;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
index 25281fe..00f3316 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
@@ -17,13 +17,10 @@
*/
package org.apache.phoenix.coprocessor;
-import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -98,16 +95,6 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn
}
@Override
- public void startup() throws IOException {
- delegate.startup();
- }
-
- @Override
- public void shutdown() {
- delegate.shutdown();
- }
-
- @Override
public OnlineRegions getOnlineRegions() {
return delegate.getOnlineRegions();
}
@@ -126,5 +113,6 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn
public MetricRegistry getMetricRegistryForRegionServer() {
return delegate.getMetricRegistryForRegionServer();
}
-
+
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
index 8fcd68d..a65f78f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
@@ -68,12 +68,6 @@ public class DelegateRegionObserver implements RegionObserver {
}
@Override
- public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
- delegate.postLogReplay(c);
- }
-
-
- @Override
public void preFlush(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker tracker) throws IOException {
delegate.preFlush(c, tracker);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 63719cf..c7151ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -104,7 +104,6 @@ import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -120,7 +119,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
@@ -231,8 +230,8 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
@@ -266,7 +265,7 @@ import com.google.protobuf.Service;
* @since 0.1
*/
@SuppressWarnings("deprecation")
-public class MetaDataEndpointImpl extends MetaDataProtocol implements CoprocessorService, Coprocessor {
+public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCoprocessor {
private static final Logger logger = LoggerFactory.getLogger(MetaDataEndpointImpl.class);
// Column to track tables that have been upgraded based on PHOENIX-2067
@@ -497,8 +496,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
@Override
- public Service getService() {
- return this;
+ public Iterable<Service> getServices() {
+ return Collections.singleton(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 68b36f5..3f7439a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -52,8 +52,8 @@ import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.SequenceUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -105,7 +105,6 @@ public class SequenceRegionObserver implements RegionObserver {
RegionCoprocessorEnvironment env = e.getEnvironment();
// We need to set this to prevent region.increment from being called
e.bypass();
- e.complete();
Region region = env.getRegion();
byte[] row = increment.getRow();
List<RowLock> locks = Lists.newArrayList();
@@ -363,7 +362,6 @@ public class SequenceRegionObserver implements RegionObserver {
RegionCoprocessorEnvironment env = e.getEnvironment();
// We need to set this to prevent region.append from being called
e.bypass();
- e.complete();
Region region = env.getRegion();
byte[] row = append.getRow();
List<RowLock> locks = Lists.newArrayList();
@@ -397,7 +395,7 @@ public class SequenceRegionObserver implements RegionObserver {
// Timestamp should match exactly, or we may have the wrong sequence
if (expectedValue != value || currentValueKV.getTimestamp() != clientTimestamp) {
return Result.create(Collections.singletonList(
- (Cell)PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES,
+ PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES,
QueryConstants.EMPTY_COLUMN_BYTES, currentValueKV.getTimestamp(), ByteUtil.EMPTY_BYTE_ARRAY)));
}
m = new Put(row, currentValueKV.getTimestamp());
@@ -425,7 +423,7 @@ public class SequenceRegionObserver implements RegionObserver {
// the client cares about is the timestamp, which is the timestamp of
// when the mutation was actually performed (useful in the case of .
return Result.create(Collections.singletonList(
- (Cell)PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
+ PhoenixKeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
} finally {
ServerUtil.releaseRowLocks(locks);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 4e4e0f5..7f0be01 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -132,8 +132,8 @@ import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -1325,23 +1325,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- /*
- * TODO: use waitForFlushes PHOENIX-4352
- */
- @Override
- public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
- throws IOException {
- // Don't allow splitting if operations need read and write to same region are going on in the
- // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
- synchronized (lock) {
- isRegionClosingOrSplitting = true;
- if (scansReferenceCount > 0) {
- throw new IOException("Operations like local index building/delete/upsert select"
- + " might be going on so not allowing to split.");
- }
- }
- }
-
@Override
public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c,
List<Pair<byte[], String>> familyPaths) throws IOException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 5f4afe5..729b928 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -202,7 +202,7 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
public void start(CoprocessorEnvironment e) throws IOException {
try {
final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
- String serverName = env.getRegionServerServices().getServerName().getServerName();
+ String serverName = env.getServerName().getServerName();
if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
// make sure the right version <-> combinations are allowed.
String errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration());
@@ -395,7 +395,7 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
Durability defaultDurability = Durability.SYNC_WAL;
if(c.getEnvironment().getRegion() != null) {
- defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
+ defaultDurability = c.getEnvironment().getRegion().getTableDescriptor().getDurability();
defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ?
Durability.SYNC_WAL : defaultDurability;
}
@@ -531,7 +531,7 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
metricSource.updateIndexPrepareTime(duration);
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
- byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+ byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
while(indexUpdatesItr.hasNext()) {
@@ -682,7 +682,6 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
if (this.disabled) {
- super.postOpen(c);
return;
}
@@ -717,10 +716,12 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
}
@Override
- public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
- HLogKey logKey, WALEdit logEdit) throws IOException {
+ public void preWALRestore(
+ org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
+ org.apache.hadoop.hbase.client.RegionInfo info, org.apache.hadoop.hbase.wal.WALKey logKey, WALEdit logEdit)
+ throws IOException {
+
if (this.disabled) {
- super.preWALRestore(env, info, logKey, logEdit);
return;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 4c410ad..6450fd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
@@ -66,10 +66,10 @@ public class IndexBuildManager implements Stoppable {
return builder;
} catch (InstantiationException e1) {
throw new IOException("Couldn't instantiate index builder:" + builderClass
- + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
+ + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString());
} catch (IllegalAccessException e1) {
throw new IOException("Couldn't instantiate index builder:" + builderClass
- + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
+ + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString());
}
}
@@ -90,7 +90,7 @@ public class IndexBuildManager implements Stoppable {
}
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
- Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException {
+ Collection<Cell> filtered, IndexMetaData indexMetaData) throws IOException {
// this is run async, so we can take our time here
return delegate.getIndexUpdateForFilteredRows(filtered, indexMetaData);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index d626689..7ca43ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.hbase.index.table;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index f78fdcf..ceac999 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Table;
@@ -77,7 +76,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
ThreadPoolManager.getExecutor(
new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
- INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), parent, env);
this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
}
@@ -86,7 +85,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
* <p>
* Exposed for TESTING
*/
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) {
+ void setup(HTableFactory factory, ExecutorService pool,Stoppable stop, RegionCoprocessorEnvironment env) {
this.factory = factory;
this.pool = new QuickFailingTaskRunner(pool);
this.stopped = stop;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index d9a9b21..f427646 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.hbase.index.CapturingAbortable;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
@@ -74,7 +73,6 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
private TaskRunner pool;
private HTableFactory factory;
- private CapturingAbortable abortable;
private Stoppable stopped;
private RegionCoprocessorEnvironment env;
private KeyValueBuilder kvBuilder;
@@ -95,7 +93,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
ThreadPoolManager.getExecutor(
new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
- INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), parent, env);
this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
}
@@ -104,11 +102,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
* <p>
* Exposed for TESTING
*/
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+ void setup(HTableFactory factory, ExecutorService pool, Stoppable stop,
RegionCoprocessorEnvironment env) {
this.pool = new WaitForCompletionTaskRunner(pool);
this.factory = factory;
- this.abortable = new CapturingAbortable(abortable);
this.stopped = stop;
}
@@ -192,8 +189,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
}
private void throwFailureIfDone() throws SingleIndexWriteFailureException {
- if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
- "Pool closed, not attempting to write to the index!", null); }
+ if (stopped.isStopped() || env.getConnection() == null || env.getConnection().isClosed()
+ || env.getConnection().isAborted()
+ || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
+ "Pool closed, not attempting to write to the index!", null); }
}
});
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 4641a8d..4064457 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
@@ -87,7 +86,6 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.transaction.PhoenixTransactionalTable;
import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -129,7 +127,7 @@ public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoproc
@Override
public void start(CoprocessorEnvironment e) throws IOException {
final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
- String serverName = env.getRegionServerServices().getServerName().getServerName();
+ String serverName = env.getServerName().getServerName();
codec = new PhoenixIndexCodec();
codec.initialize(env);
// Clone the config since it is shared
@@ -192,7 +190,6 @@ public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoproc
Mutation m = miniBatchOp.getOperation(0);
if (!codec.isEnabled(m)) {
- super.preBatchMutate(c, miniBatchOp);
return;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
index f12d49d..74bbaad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
@@ -35,7 +35,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ActiveRMInfoProto;
@@ -84,11 +84,11 @@ public class PhoenixMRJobUtil {
public static String getActiveResourceManagerHost(Configuration config, String zkQuorum)
throws IOException, InterruptedException, JSONException, KeeperException,
InvalidProtocolBufferException, ZooKeeperConnectionException {
- ZooKeeperWatcher zkw = null;
+ ZKWatcher zkw = null;
ZooKeeper zk = null;
String activeRMHost = null;
try {
- zkw = new ZooKeeperWatcher(config, "get-active-yarnmanager", null);
+ zkw = new ZKWatcher(config, "get-active-yarnmanager", null);
zk = new ZooKeeper(zkQuorum, 30000, zkw, false);
List<String> children = zk.getChildren(YARN_LEADER_ELECTION, zkw);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a9a9f09/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index 4f1eea6..0d6ac7f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -48,16 +48,14 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.phoenix.hbase.index.IndexTableName;
import org.apache.phoenix.hbase.index.IndexTestingUtils;
import org.apache.phoenix.hbase.index.Indexer;
@@ -103,11 +101,13 @@ public class TestWALRecoveryCaching {
// -----------------------------------------------------------------------------------------------
private static CountDownLatch allowIndexTableToRecover;
- public static class IndexTableBlockingReplayObserver extends BaseRegionObserver {
+ public static class IndexTableBlockingReplayObserver implements RegionObserver {
@Override
- public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
- HLogKey logKey, WALEdit logEdit) throws IOException {
+ public void preWALRestore(
+ org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
+ org.apache.hadoop.hbase.client.RegionInfo info, WALKey logKey,
+ org.apache.hadoop.hbase.wal.WALEdit logEdit) throws IOException {
try {
LOG.debug("Restoring logs for index table");
if (allowIndexTableToRecover != null) {