You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/09/27 19:45:38 UTC
[7/7] hbase git commit: HBASE-17732 Coprocessor Design Improvements
HBASE-17732 Coprocessor Design Improvements
------------------------------------------------------
TL;DR
------------------------------------------------------
We are moving from Inheritence
- Observer *is* Coprocessor
- FooService *is* CoprocessorService
To Composition
- Coprocessor *has* Observer
- Coprocessor *has* Service
------------------------------------------------------
Design Changes
------------------------------------------------------
- Adds four new interfaces - MasterCoprocessor, RegionCoprocessor, RegionServierCoprocessor,
WALCoprocessor
- These new *Coprocessor interfaces have a get*Observer() function for each observer type
supported by them.
- Added Coprocessor#getService() to base interface. All extending *Coprocessor interfaces will
get it from the base interface.
- Added BulkLoadObserver hooks to RegionCoprocessorHost instad of SecureBulkLoadManager doing its
own trickery.
- CoprocessorHost#find*() fuctions: Too many testing hooks digging into CP internals.
Deleted if can, else marked @VisibleForTesting.
------------------------------------------------------
Backward Compatibility
------------------------------------------------------
- Old coprocessors implementing *Observer won't get loaded (no backward compatibility guarantees).
- Third party coprocessors only implementing Coprocessor will not get loaded (just like Observers).
- Old coprocessors implementing CoprocessorService (for master/region host)
/SingletonCoprocessorService (for RegionServer host) will continue to work with 2.0.
- Added test to ensure backward compatibility of CoprocessorService/SingletonCoprocessorService
- Note that if a coprocessor implements both observer and service in same class, its service
component will continue to work but it's observer component won't work.
------------------------------------------------------
Notes
------------------------------------------------------
Did a side-by-side comparison of CPs in master and after patch. These coprocessors which were just
CoprocessorService earlier, needed a home in some coprocessor in new design. For most it was clear
since they were using a particular type of environment. Some were tricky.
- JMXListener - MasterCoprocessor and RSCoprocessor (because jmx listener makes sense for
processes?)
- RSGroupAdminEndpoint --> MasterCP
- VisibilityController -> MasterCP and RegionCP
These were converted to RegionCoprocessor because they were using RegionCoprocessorEnvironment
which can only come from a RegionCPHost.
- AggregateImplementation
- BaseRowProcessorEndpoint
- BulkDeleteEndpoint
- Export
- RefreshHFilesEndpoint
- RowCountEndpoint
- MultiRowMutationEndpoint
- SecureBulkLoadEndpoint
- TokenProvider
Change-Id: I813145f2bc11815f52ac703563b879962c249764
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/97513466
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/97513466
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/97513466
Branch: refs/heads/master
Commit: 97513466c05f5eaadb94425c98098063ac374098
Parents: bd68551
Author: Apekshit Sharma <ap...@apache.org>
Authored: Mon Sep 4 14:02:36 2017 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Wed Sep 27 12:40:25 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/backup/BackupObserver.java | 10 +-
.../backup/TestBackupDeleteWithFailures.java | 15 +-
.../org/apache/hadoop/hbase/Coprocessor.java | 17 +-
.../hadoop/hbase/CoprocessorEnvironment.java | 13 +-
.../apache/hadoop/hbase/HTableDescriptor.java | 9 +-
.../coprocessor/AggregateImplementation.java | 28 +-
.../apache/hadoop/hbase/coprocessor/Export.java | 9 +-
.../security/access/SecureBulkLoadEndpoint.java | 13 +-
.../coprocessor/ColumnAggregationEndpoint.java | 8 +-
.../ColumnAggregationEndpointNullResponse.java | 10 +-
.../ColumnAggregationEndpointWithErrors.java | 14 +-
.../coprocessor/ProtobufCoprocessorService.java | 11 +-
.../TestAsyncCoprocessorEndpoint.java | 20 +-
.../hbase/coprocessor/TestClassLoading.java | 27 +-
...CoprocessorServiceBackwardCompatibility.java | 116 ++
.../TestRegionServerCoprocessorEndpoint.java | 19 +-
.../coprocessor/TestRowProcessorEndpoint.java | 8 +-
.../regionserver/TestServerCustomProtocol.java | 23 +-
.../coprocessor/example/BulkDeleteEndpoint.java | 19 +-
.../ExampleMasterObserverWithMetrics.java | 10 +-
.../ExampleRegionObserverWithMetrics.java | 60 +-
.../example/RefreshHFilesEndpoint.java | 10 +-
.../coprocessor/example/RowCountEndpoint.java | 11 +-
.../example/ZooKeeperScanPolicyObserver.java | 17 +-
.../mapreduce/IntegrationTestBulkLoad.java | 10 +-
.../TestImportTSVWithOperationAttributes.java | 10 +-
.../hbase/mapreduce/TestImportTSVWithTTLs.java | 9 +-
.../hbase/rsgroup/RSGroupAdminEndpoint.java | 21 +-
.../hadoop/hbase/rsgroup/TestRSGroups.java | 4 +-
.../hbase/rsgroup/TestRSGroupsOfflineMode.java | 2 +-
.../org/apache/hadoop/hbase/JMXListener.java | 5 +-
.../hadoop/hbase/client/HTableWrapper.java | 4 +-
.../hbase/constraint/ConstraintProcessor.java | 9 +-
.../hbase/coprocessor/BaseEnvironment.java | 187 +++
.../coprocessor/BaseRowProcessorEndpoint.java | 14 +-
.../hbase/coprocessor/BulkLoadObserver.java | 2 +-
.../hbase/coprocessor/CoprocessorHost.java | 500 +++----
.../hbase/coprocessor/CoprocessorService.java | 2 +
.../CoprocessorServiceBackwardCompatiblity.java | 86 ++
.../hbase/coprocessor/EndpointObserver.java | 2 +-
.../hbase/coprocessor/MasterCoprocessor.java | 34 +
.../MasterCoprocessorEnvironment.java | 2 +-
.../hbase/coprocessor/MasterObserver.java | 2 +-
.../coprocessor/MultiRowMutationEndpoint.java | 10 +-
.../hbase/coprocessor/ObserverContext.java | 12 +-
.../hbase/coprocessor/RegionCoprocessor.java | 43 +
.../RegionCoprocessorEnvironment.java | 4 +-
.../hbase/coprocessor/RegionObserver.java | 2 +-
.../coprocessor/RegionServerCoprocessor.java | 34 +
.../RegionServerCoprocessorEnvironment.java | 3 +-
.../hbase/coprocessor/RegionServerObserver.java | 2 +-
.../SingletonCoprocessorService.java | 2 +
.../hbase/coprocessor/WALCoprocessor.java | 36 +
.../coprocessor/WALCoprocessorEnvironment.java | 2 +-
.../hadoop/hbase/coprocessor/WALObserver.java | 2 +-
.../hadoop/hbase/coprocessor/package-info.java | 5 +-
.../hbase/master/MasterCoprocessorHost.java | 1244 +++++++-----------
.../hbase/quotas/MasterSpaceQuotaObserver.java | 9 +-
.../regionserver/RegionCoprocessorHost.java | 1001 +++++++-------
.../RegionServerCoprocessorHost.java | 284 ++--
.../regionserver/SecureBulkLoadManager.java | 38 +-
.../regionserver/wal/WALCoprocessorHost.java | 168 +--
.../regionserver/ReplicationObserver.java | 9 +-
.../hbase/security/access/AccessController.java | 49 +-
.../CoprocessorWhitelistMasterObserver.java | 9 +-
.../hbase/security/token/TokenProvider.java | 12 +-
.../visibility/VisibilityController.java | 66 +-
.../visibility/VisibilityReplication.java | 64 +
.../hadoop/hbase/tool/WriteSinkCoprocessor.java | 11 +-
.../hbase/client/HConnectionTestingUtility.java | 9 +-
.../hbase/client/TestAsyncAdminBuilder.java | 23 +-
...syncNonMetaRegionLocatorConcurrenyLimit.java | 9 +-
.../client/TestAsyncRegionLocatorTimeout.java | 8 +-
.../hbase/client/TestAsyncTableBatch.java | 9 +-
...estAvoidCellReferencesIntoShippedBlocks.java | 9 +-
.../client/TestBlockEvictionFromClient.java | 9 +-
.../client/TestClientOperationInterrupt.java | 9 +-
.../hadoop/hbase/client/TestEnableTable.java | 17 +-
.../hadoop/hbase/client/TestFromClientSide.java | 9 +-
.../hbase/client/TestFromClientSide3.java | 33 +-
.../org/apache/hadoop/hbase/client/TestHCM.java | 25 +-
.../client/TestMobCloneSnapshotFromClient.java | 10 +-
.../hbase/client/TestReplicaWithCluster.java | 24 +-
.../hadoop/hbase/client/TestReplicasClient.java | 9 +-
.../hbase/client/TestResultFromCoprocessor.java | 9 +-
.../hbase/client/TestServerBusyException.java | 17 +-
.../coprocessor/SampleRegionWALCoprocessor.java | 199 +++
.../coprocessor/SampleRegionWALObserver.java | 188 ---
.../hbase/coprocessor/SimpleRegionObserver.java | 8 +-
.../TestCoprocessorConfiguration.java | 9 +-
.../hbase/coprocessor/TestCoprocessorHost.java | 39 +-
.../coprocessor/TestCoprocessorInterface.java | 60 +-
.../coprocessor/TestCoprocessorMetrics.java | 28 +-
.../hbase/coprocessor/TestCoprocessorStop.java | 2 +-
.../hbase/coprocessor/TestHTableWrapper.java | 11 +-
...TestMasterCoprocessorExceptionWithAbort.java | 11 +-
...estMasterCoprocessorExceptionWithRemove.java | 11 +-
.../hbase/coprocessor/TestMasterObserver.java | 41 +-
.../coprocessor/TestOpenTableInCoprocessor.java | 15 +-
.../coprocessor/TestRegionObserverBypass.java | 8 +-
...erverForAddingMutationsFromCoprocessors.java | 37 +-
.../TestRegionObserverInterface.java | 23 +-
.../TestRegionObserverScannerOpenHook.java | 27 +-
.../coprocessor/TestRegionObserverStacking.java | 24 +-
...gionServerCoprocessorExceptionWithAbort.java | 4 +-
.../hbase/coprocessor/TestWALObserver.java | 25 +-
.../hbase/mob/compactions/TestMobCompactor.java | 9 +-
.../hbase/namespace/TestNamespaceAuditor.java | 39 +-
.../regionserver/NoOpScanPolicyObserver.java | 9 +-
.../regionserver/TestHRegionServerBulkLoad.java | 9 +-
.../regionserver/TestRegionServerAbort.java | 20 +-
.../TestScannerRetriableFailure.java | 9 +-
.../TestScannerWithCorruptHFile.java | 9 +-
.../TestSettingTimeoutOnBlockingPoint.java | 9 +-
.../TestSplitTransactionOnCluster.java | 13 +-
.../hadoop/hbase/regionserver/TestTags.java | 9 +-
.../regionserver/wal/AbstractTestFSWAL.java | 6 +-
.../wal/AbstractTestProtobufLog.java | 4 +-
.../replication/TestMasterReplication.java | 9 +-
.../replication/TestReplicationWithTags.java | 16 +-
...egionReplicaReplicationEndpointNoMaster.java | 10 +-
.../hbase/security/access/SecureTestUtil.java | 20 +-
.../security/access/TestAccessController.java | 22 +-
.../security/access/TestAccessController2.java | 7 +-
.../security/access/TestAccessController3.java | 11 +-
.../access/TestCellACLWithMultipleVersions.java | 7 +-
.../hbase/security/access/TestCellACLs.java | 7 +-
.../TestCoprocessorWhitelistMasterObserver.java | 10 +-
.../security/access/TestNamespaceCommands.java | 4 +-
.../access/TestScanEarlyTermination.java | 4 +-
.../access/TestWithDisabledAuthorization.java | 10 +-
.../security/token/TestTokenAuthentication.java | 11 +-
...sibilityLabelReplicationWithExpAsString.java | 2 -
.../TestVisibilityLabelsReplication.java | 17 +-
.../snapshot/TestSnapshotClientRetries.java | 9 +-
.../hadoop/hbase/util/BaseTestHBaseFsck.java | 29 +-
.../hbase/util/TestCoprocessorScanPolicy.java | 9 +-
.../hadoop/hbase/util/TestHBaseFsckMOB.java | 2 +-
.../hadoop/hbase/util/TestHBaseFsckOneRS.java | 2 +-
.../hbase/util/TestHBaseFsckReplicas.java | 2 +-
.../hadoop/hbase/util/TestHBaseFsckTwoRS.java | 2 +-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 6 +-
.../hbase/thrift/ErrorThrowingGetObserver.java | 9 +-
.../thrift2/TestThriftHBaseServiceHandler.java | 9 +-
144 files changed, 3307 insertions(+), 2615 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
index 4131b4d..e2b27ff 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.backup;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Pair;
@@ -43,8 +45,14 @@ import org.apache.yetus.audience.InterfaceAudience;
* An Observer to facilitate backup operations
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class BackupObserver implements RegionObserver {
+public class BackupObserver implements RegionCoprocessor, RegionObserver {
private static final Log LOG = LogFactory.getLog(BackupObserver.class);
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java
index 843ed38..af8e907 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -67,9 +69,7 @@ public class TestBackupDeleteWithFailures extends TestBackupBase{
POST_DELETE_SNAPSHOT_FAILURE
}
- public static class MasterSnapshotObserver implements MasterObserver {
-
-
+ public static class MasterSnapshotObserver implements MasterCoprocessor, MasterObserver {
List<Failure> failures = new ArrayList<Failure>();
public void setFailures(Failure ... f) {
@@ -80,6 +80,11 @@ public class TestBackupDeleteWithFailures extends TestBackupBase{
}
@Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
throws IOException
@@ -121,8 +126,8 @@ public class TestBackupDeleteWithFailures extends TestBackupBase{
private MasterSnapshotObserver getMasterSnapshotObserver() {
- return (MasterSnapshotObserver)TEST_UTIL.getHBaseCluster().getMaster()
- .getMasterCoprocessorHost().findCoprocessor(MasterSnapshotObserver.class.getName());
+ return TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost()
+ .findCoprocessor(MasterSnapshotObserver.class);
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-client/src/main/java/org/apache/hadoop/hbase/Coprocessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/Coprocessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/Coprocessor.java
index f253350..38fe74e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/Coprocessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/Coprocessor.java
@@ -20,7 +20,9 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
+import java.util.Optional;
+import com.google.protobuf.Service;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -53,9 +55,22 @@ public interface Coprocessor {
STOPPED
}
- // Interface
+ /**
+ * Called by the {@link CoprocessorEnvironment} during it's own startup to initialize the
+ * coprocessor.
+ */
default void start(CoprocessorEnvironment env) throws IOException {}
+ /**
+ * Called by the {@link CoprocessorEnvironment} during it's own shutdown to stop the
+ * coprocessor.
+ */
default void stop(CoprocessorEnvironment env) throws IOException {}
+ /**
+ * Coprocessor endpoints providing protobuf services should implement this interface.
+ */
+ default Optional<Service> getService() {
+ return Optional.empty();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
index adc7386..aabf3b5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.client.Table;
* Coprocessor environment state.
*/
@InterfaceAudience.Private
-public interface CoprocessorEnvironment {
+public interface CoprocessorEnvironment<C extends Coprocessor> {
/** @return the Coprocessor interface version */
int getVersion();
@@ -39,7 +39,7 @@ public interface CoprocessorEnvironment {
String getHBaseVersion();
/** @return the loaded coprocessor instance */
- Coprocessor getInstance();
+ C getInstance();
/** @return the priority assigned to the loaded coprocessor */
int getPriority();
@@ -67,4 +67,13 @@ public interface CoprocessorEnvironment {
* @return the classloader for the loaded coprocessor instance
*/
ClassLoader getClassLoader();
+
+ /**
+ * After a coprocessor has been loaded in an encapsulation of an environment, CoprocessorHost
+ * calls this function to initialize the environment.
+ */
+ void startup() throws IOException;
+
+ /** Clean up the environment. Called by CoprocessorHost when it itself is shutting down. */
+ void shutdown();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index e1f274c..c62ab1d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -681,8 +681,7 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
/**
* Add a table coprocessor to this table. The coprocessor
- * type must be org.apache.hadoop.hbase.coprocessor.RegionObserver
- * or Endpoint.
+ * type must be org.apache.hadoop.hbase.coprocessor.RegionCoprocessor.
* It won't check if the class can be loaded or not.
* Whether a coprocessor is loadable or not will be determined when
* a region is opened.
@@ -696,8 +695,7 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
/**
* Add a table coprocessor to this table. The coprocessor
- * type must be org.apache.hadoop.hbase.coprocessor.RegionObserver
- * or Endpoint.
+ * type must be org.apache.hadoop.hbase.coprocessor.RegionCoprocessor.
* It won't check if the class can be loaded or not.
* Whether a coprocessor is loadable or not will be determined when
* a region is opened.
@@ -717,8 +715,7 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
/**
* Add a table coprocessor to this table. The coprocessor
- * type must be org.apache.hadoop.hbase.coprocessor.RegionObserver
- * or Endpoint.
+ * type must be org.apache.hadoop.hbase.coprocessor.RegionCoprocessor.
* It won't check if the class can be loaded or not.
* Whether a coprocessor is loadable or not will be determined when
* a region is opened.
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index e029885..dcd6f44 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -31,11 +31,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
@@ -60,8 +60,8 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
* @param R PB message that is used to transport Promoted (<S>) instance
*/
@InterfaceAudience.Private
-public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
-extends AggregateService implements CoprocessorService, Coprocessor {
+public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
+extends AggregateService implements RegionCoprocessor {
protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
private RegionCoprocessorEnvironment env;
@@ -156,7 +156,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
results.clear();
} while (hasMoreRows);
if (min != null) {
- response = AggregateResponse.newBuilder().addFirstPart(
+ response = AggregateResponse.newBuilder().addFirstPart(
ci.getProtoForCellType(min).toByteString()).build();
}
} catch (IOException e) {
@@ -211,7 +211,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
results.clear();
} while (hasMoreRows);
if (sumVal != null) {
- response = AggregateResponse.newBuilder().addFirstPart(
+ response = AggregateResponse.newBuilder().addFirstPart(
ci.getProtoForPromotedType(sumVal).toByteString()).build();
}
} catch (IOException e) {
@@ -262,7 +262,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
} while (hasMoreRows);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
bb.rewind();
- response = AggregateResponse.newBuilder().addFirstPart(
+ response = AggregateResponse.newBuilder().addFirstPart(
ByteString.copyFrom(bb)).build();
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
@@ -310,7 +310,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
}
List<Cell> results = new ArrayList<>();
boolean hasMoreRows = false;
-
+
do {
results.clear();
hasMoreRows = scanner.next(results);
@@ -371,7 +371,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
List<Cell> results = new ArrayList<>();
boolean hasMoreRows = false;
-
+
do {
tempVal = null;
hasMoreRows = scanner.next(results);
@@ -413,7 +413,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
* It is computed for the combination of column
* family and column qualifier(s) in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
- * two column qualifiers. The first qualifier is for values column and
+ * two column qualifiers. The first qualifier is for values column and
* the second qualifier (optional) is for weight column.
*/
@Override
@@ -437,7 +437,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
List<Cell> results = new ArrayList<>();
boolean hasMoreRows = false;
-
+
do {
tempVal = null;
tempWeight = null;
@@ -461,7 +461,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
AggregateResponse.Builder pair = AggregateResponse.newBuilder();
pair.addFirstPart(first_sumVal);
- pair.addFirstPart(first_sumWeights);
+ pair.addFirstPart(first_sumWeights);
response = pair.build();
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
@@ -500,8 +500,8 @@ extends AggregateService implements CoprocessorService, Coprocessor {
}
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
/**
@@ -527,5 +527,5 @@ extends AggregateService implements CoprocessorService, Coprocessor {
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
-
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
index 8f84d9e..9b8901e 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,7 +36,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -87,8 +87,7 @@ import org.apache.hadoop.util.ReflectionUtils;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public class Export extends ExportProtos.ExportService
- implements Coprocessor, CoprocessorService {
+public class Export extends ExportProtos.ExportService implements RegionCoprocessor {
private static final Log LOG = LogFactory.getLog(Export.class);
private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class;
@@ -312,8 +311,8 @@ public class Export extends ExportProtos.ExportService
}
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index 3f8b3bb..4286174 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -21,14 +21,13 @@ package org.apache.hadoop.hbase.security.access;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Coprocessor service for bulk loads in secure mode.
@@ -55,8 +55,7 @@ import com.google.protobuf.Service;
*/
@InterfaceAudience.Private
@Deprecated
-public class SecureBulkLoadEndpoint extends SecureBulkLoadService
- implements CoprocessorService, Coprocessor {
+public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements RegionCoprocessor {
public static final long VERSION = 0L;
@@ -176,7 +175,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
index 9c03e6a..54f1f53 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
@@ -21,12 +21,12 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
@@ -45,13 +45,13 @@ import com.google.protobuf.Service;
* The aggregation implementation at a region.
*/
public class ColumnAggregationEndpoint extends ColumnAggregationService
-implements Coprocessor, CoprocessorService {
+implements RegionCoprocessor {
private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class);
private RegionCoprocessorEnvironment env = null;
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
index 54e3358..43a0075 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
@@ -47,14 +47,12 @@ import com.google.protobuf.Service;
* response values.
*/
public class ColumnAggregationEndpointNullResponse
- extends
- ColumnAggregationServiceNullResponse
-implements Coprocessor, CoprocessorService {
+ extends ColumnAggregationServiceNullResponse implements RegionCoprocessor {
private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointNullResponse.class);
private RegionCoprocessorEnvironment env = null;
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
index 6e8c571..0faa717 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -48,14 +48,14 @@ import com.google.protobuf.Service;
* coprocessor endpoints throwing exceptions.
*/
public class ColumnAggregationEndpointWithErrors
- extends
- ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
-implements Coprocessor, CoprocessorService {
+ extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
+ implements RegionCoprocessor {
private static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointWithErrors.class);
private RegionCoprocessorEnvironment env = null;
+
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
@@ -73,7 +73,7 @@ implements Coprocessor, CoprocessorService {
}
@Override
- public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request,
+ public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request,
RpcCallback<ColumnAggregationWithErrorsSumResponse> done) {
// aggregate at each region
Scan scan = new Scan();
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
index 5b7c1e9..bc8d3e9 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -34,6 +33,7 @@ import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.util.Threads;
import java.io.IOException;
+import java.util.Optional;
/**
* Test implementation of a coprocessor endpoint exposing the
@@ -41,13 +41,12 @@ import java.io.IOException;
* only.
*/
public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto
- implements CoprocessorService, Coprocessor {
- public ProtobufCoprocessorService() {
- }
+ implements MasterCoprocessor, RegionCoprocessor {
+ public ProtobufCoprocessorService() {}
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
index 16fb03c..84c777c 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
@@ -23,28 +23,21 @@ import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Optional;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
-import org.apache.hadoop.hbase.coprocessor.TestRegionServerCoprocessorEndpoint.DummyRegionServerEndpoint;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.BeforeClass;
@@ -56,7 +49,6 @@ import org.junit.runners.Parameterized;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
@@ -133,14 +125,14 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
}
}
- static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService {
+ public static class DummyRegionServerEndpoint extends DummyService
+ implements RegionServerCoprocessor {
- public DummyRegionServerEndpoint() {
- }
+ public DummyRegionServerEndpoint() {}
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
index 56fdca6..9067c88 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
@@ -54,7 +54,12 @@ public class TestClassLoading {
private static final Log LOG = LogFactory.getLog(TestClassLoading.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- public static class TestMasterCoprocessor implements MasterObserver {}
+ public static class TestMasterCoprocessor implements MasterCoprocessor, MasterObserver {
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+ }
private static MiniDFSCluster cluster;
@@ -69,7 +74,7 @@ public class TestClassLoading {
private static Class<?> regionCoprocessor1 = ColumnAggregationEndpoint.class;
// TOOD: Fix the import of this handler. It is coming in from a package that is far away.
private static Class<?> regionCoprocessor2 = TestServerCustomProtocol.PingHandler.class;
- private static Class<?> regionServerCoprocessor = SampleRegionWALObserver.class;
+ private static Class<?> regionServerCoprocessor = SampleRegionWALCoprocessor.class;
private static Class<?> masterCoprocessor = TestMasterCoprocessor.class;
private static final String[] regionServerSystemCoprocessors =
@@ -110,8 +115,9 @@ public class TestClassLoading {
}
static File buildCoprocessorJar(String className) throws Exception {
- String code = "import org.apache.hadoop.hbase.coprocessor.*;" +
- "public class " + className + " implements RegionObserver {}";
+ String code =
+ "import org.apache.hadoop.hbase.coprocessor.*;" +
+ "public class " + className + " implements RegionCoprocessor {}";
return ClassLoaderTestHelper.buildJar(
TEST_UTIL.getDataTestDir().toString(), className, code);
}
@@ -539,19 +545,6 @@ public class TestClassLoading {
assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors);
}
- @Test
- public void testFindCoprocessors() {
- // HBASE 12277:
- CoprocessorHost masterCpHost =
- TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost();
-
- List<MasterObserver> masterObservers = masterCpHost.findCoprocessors(MasterObserver.class);
-
- assertTrue(masterObservers != null && masterObservers.size() > 0);
- assertEquals(masterCoprocessor.getSimpleName(),
- masterObservers.get(0).getClass().getSimpleName());
- }
-
private void waitForTable(TableName name) throws InterruptedException, IOException {
// First wait until all regions are online
TEST_UTIL.waitTableEnabled(name);
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorServiceBackwardCompatibility.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorServiceBackwardCompatibility.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorServiceBackwardCompatibility.java
new file mode 100644
index 0000000..c2ff36e
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorServiceBackwardCompatibility.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.*;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests to ensure that 2.0 is backward compatible in loading CoprocessorService.
+ */
+@Category({SmallTests.class})
+public class TestCoprocessorServiceBackwardCompatibility {
+ private static HBaseTestingUtility TEST_UTIL = null;
+ private static Configuration CONF = null;
+
+ public static class DummyCoprocessorService extends DummyService
+ implements CoprocessorService, SingletonCoprocessorService {
+ static int numCalls = 0;
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void dummyCall(RpcController controller, DummyRequest request,
+ RpcCallback<DummyResponse> callback) {
+ callback.run(DummyResponse.newBuilder().setValue("").build());
+ numCalls++;
+ }
+
+ @Override
+ public void dummyThrow(RpcController controller, DummyRequest request,
+ RpcCallback<DummyResponse> callback) {
+ }
+ }
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ CONF = TEST_UTIL.getConfiguration();
+ DummyCoprocessorService.numCalls = 0;
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCoprocessorServiceLoadedByMaster() throws Exception {
+ CONF.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ DummyCoprocessorService.class.getName());
+ TEST_UTIL.startMiniCluster();
+
+ TEST_UTIL.getAdmin().coprocessorService().callBlockingMethod(
+ DummyCoprocessorService.getDescriptor().findMethodByName("dummyCall"), null,
+ DummyRequest.getDefaultInstance(), DummyResponse.getDefaultInstance());
+
+ assertEquals(1, DummyCoprocessorService.numCalls);
+ }
+
+ @Test
+ public void testCoprocessorServiceLoadedByRegionServer() throws Exception {
+ CONF.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+ DummyCoprocessorService.class.getName());
+ TEST_UTIL.startMiniCluster();
+ TEST_UTIL.getAdmin().coprocessorService(
+ TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName()).callBlockingMethod(
+ DummyCoprocessorService.getDescriptor().findMethodByName("dummyCall"), null,
+ DummyRequest.getDefaultInstance(), DummyResponse.getDefaultInstance());
+ assertEquals(1, DummyCoprocessorService.numCalls);
+ }
+
+ @Test
+ public void testCoprocessorServiceLoadedByRegion() throws Throwable {
+ CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ DummyCoprocessorService.class.getName());
+ TEST_UTIL.startMiniCluster();
+ TEST_UTIL.getConnection().getTable(TableName.valueOf("hbase:meta")).batchCoprocessorService(
+ DummyCoprocessorService.getDescriptor().findMethodByName("dummyCall"),
+ DummyRequest.getDefaultInstance(), Bytes.toBytes(""), Bytes.toBytes(""),
+ DummyResponse.getDefaultInstance());
+ assertEquals(1, DummyCoprocessorService.numCalls);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
index 9dc4822..2e22a16 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
@@ -22,9 +22,9 @@ import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
@@ -102,21 +102,12 @@ public class TestRegionServerCoprocessorEndpoint {
((RemoteWithExtrasException) controller.getFailedOn().getCause()).getClassName().trim());
}
- static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService {
+ public static class DummyRegionServerEndpoint extends DummyService
+ implements RegionServerCoprocessor {
@Override
- public Service getService() {
- return this;
- }
-
- @Override
- public void start(CoprocessorEnvironment env) throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void stop(CoprocessorEnvironment env) throws IOException {
- // TODO Auto-generated method stub
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
index c3f7119..8c11192 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
@@ -310,7 +310,7 @@ public class TestRowProcessorEndpoint {
* So they can be loaded with the endpoint on the coprocessor.
*/
public static class RowProcessorEndpoint<S extends Message,T extends Message>
- extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService {
+ extends BaseRowProcessorEndpoint<S,T> {
public static class IncrementCounterProcessor extends
BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
@@ -417,7 +417,7 @@ public class TestRowProcessorEndpoint {
@Override
public FriendsOfFriendsProcessorResponse getResult() {
- FriendsOfFriendsProcessorResponse.Builder builder =
+ FriendsOfFriendsProcessorResponse.Builder builder =
FriendsOfFriendsProcessorResponse.newBuilder();
builder.addAllResult(result);
return builder.build();
@@ -469,7 +469,7 @@ public class TestRowProcessorEndpoint {
}
@Override
- public void initialize(FriendsOfFriendsProcessorRequest request)
+ public void initialize(FriendsOfFriendsProcessorRequest request)
throws IOException {
this.person = request.getPerson().toByteArray();
this.row = request.getRow().toByteArray();
@@ -546,7 +546,7 @@ public class TestRowProcessorEndpoint {
// Delete from the current row and add to the other row
Delete d = new Delete(rows[i]);
KeyValue kvDelete =
- new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+ new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
kv.getTimestamp(), KeyValue.Type.Delete);
d.add(kvDelete);
Put p = new Put(rows[1 - i]);
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java
index 83c7dbf..90cf10c 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java
@@ -24,10 +24,10 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Map;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountRequest;
@@ -75,8 +75,7 @@ public class TestServerCustomProtocol {
static final String HELLO = "Hello, ";
/* Test protocol implementation */
- public static class PingHandler extends PingProtos.PingService
- implements Coprocessor, CoprocessorService {
+ public static class PingHandler extends PingProtos.PingService implements RegionCoprocessor {
private int counter = 0;
@Override
@@ -125,8 +124,8 @@ public class TestServerCustomProtocol {
}
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
}
@@ -320,7 +319,7 @@ public class TestServerCustomProtocol {
// rows from 1 region
assertEquals(1, results.size());
verifyRegionResults(locator, results, ROW_A);
-
+
final String name = "NAME";
results = hello(table, name, null, ROW_A);
// Should have gotten results for 1 of the three regions only since we specified
@@ -343,12 +342,12 @@ public class TestServerCustomProtocol {
// test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d.
// test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e.
// test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74.
-
+
Map<byte [], String> results = ping(table, null, ROW_A);
// Should contain first region only.
assertEquals(1, results.size());
verifyRegionResults(locator, results, ROW_A);
-
+
// Test start row + empty end
results = ping(table, ROW_BC, null);
assertEquals(2, results.size());
@@ -358,7 +357,7 @@ public class TestServerCustomProtocol {
results.get(loc.getRegionInfo().getRegionName()));
verifyRegionResults(locator, results, ROW_B);
verifyRegionResults(locator, results, ROW_C);
-
+
// test empty start + end
results = ping(table, null, ROW_BC);
// should contain the first 2 regions
@@ -368,7 +367,7 @@ public class TestServerCustomProtocol {
loc = locator.getRegionLocation(ROW_C, true);
assertNull("Should be missing region for row ccc (past stop row)",
results.get(loc.getRegionInfo().getRegionName()));
-
+
// test explicit start + end
results = ping(table, ROW_AB, ROW_BC);
// should contain first 2 regions
@@ -378,7 +377,7 @@ public class TestServerCustomProtocol {
loc = locator.getRegionLocation(ROW_C, true);
assertNull("Should be missing region for row ccc (past stop row)",
results.get(loc.getRegionInfo().getRegionName()));
-
+
// test single region
results = ping(table, ROW_B, ROW_BC);
// should only contain region bbb
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
index 79ff25b..5001e04 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
@@ -28,7 +29,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
@@ -57,7 +57,7 @@ import com.google.protobuf.Service;
/**
* Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with
- * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s)
+ * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s)
* or version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s)
* getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers
* which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be
@@ -65,16 +65,16 @@ import com.google.protobuf.Service;
* which needs to be deleted.When a timestamp is passed only one version at that timestamp will be
* deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions
* which the Scan selects will get deleted.
- *
+ *
* <br> Example: <pre><code>
* Scan scan = new Scan();
* // set scan properties(rowkey range, filters, timerange etc).
* HTable ht = ...;
* long noOfDeletedRows = 0L;
- * Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
+ * Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
* new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
* ServerRpcController controller = new ServerRpcController();
- * BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
+ * BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
* new BlockingRpcCallback<BulkDeleteResponse>();
*
* public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
@@ -95,16 +95,15 @@ import com.google.protobuf.Service;
* }
* </code></pre>
*/
-public class BulkDeleteEndpoint extends BulkDeleteService implements CoprocessorService,
- Coprocessor {
+public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCoprocessor {
private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
private static final Log LOG = LogFactory.getLog(BulkDeleteEndpoint.class);
private RegionCoprocessorEnvironment env;
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java
index a93935d..c27672c 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -45,7 +47,11 @@ import org.apache.hadoop.hbase.metrics.Timer;
* </p>
* @see ExampleRegionObserverWithMetrics
*/
-public class ExampleMasterObserverWithMetrics implements MasterObserver {
+public class ExampleMasterObserverWithMetrics implements MasterCoprocessor, MasterObserver {
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
private static final Log LOG = LogFactory.getLog(ExampleMasterObserverWithMetrics.class);
@@ -133,4 +139,4 @@ public class ExampleMasterObserverWithMetrics implements MasterObserver {
registry.register("maxMemory", this::getMaxMemory);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java
index fd593a7..f03b915 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java
@@ -22,12 +22,14 @@ package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.metrics.Counter;
@@ -45,36 +47,49 @@ import org.apache.hadoop.hbase.metrics.Timer;
*
* @see ExampleMasterObserverWithMetrics
*/
-public class ExampleRegionObserverWithMetrics implements RegionObserver {
+public class ExampleRegionObserverWithMetrics implements RegionCoprocessor {
private Counter preGetCounter;
private Timer costlyOperationTimer;
+ private ExampleRegionObserver observer;
- @Override
- public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
- throws IOException {
- // Increment the Counter whenever the coprocessor is called
- preGetCounter.increment();
- }
+ class ExampleRegionObserver implements RegionCoprocessor, RegionObserver {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
- @Override
- public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
- List<Cell> results) throws IOException {
- // do a costly (high latency) operation which we want to measure how long it takes by
- // using a Timer (which is a Meter and a Histogram).
- long start = System.nanoTime();
- try {
- performCostlyOperation();
- } finally {
- costlyOperationTimer.updateNanos(System.nanoTime() - start);
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
+ List<Cell> results) throws IOException {
+ // Increment the Counter whenever the coprocessor is called
+ preGetCounter.increment();
+ }
+
+ @Override
+ public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
+ List<Cell> results) throws IOException {
+ // do a costly (high latency) operation which we want to measure how long it takes by
+ // using a Timer (which is a Meter and a Histogram).
+ long start = System.nanoTime();
+ try {
+ performCostlyOperation();
+ } finally {
+ costlyOperationTimer.updateNanos(System.nanoTime() - start);
+ }
+ }
+
+ private void performCostlyOperation() {
+ try {
+ // simulate the operation by sleeping.
+ Thread.sleep(ThreadLocalRandom.current().nextLong(100));
+ } catch (InterruptedException ignore) {
+ }
}
}
- private void performCostlyOperation() {
- try {
- // simulate the operation by sleeping.
- Thread.sleep(ThreadLocalRandom.current().nextLong(100));
- } catch (InterruptedException ignore) {}
+ @Override public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(observer);
}
@Override
@@ -88,6 +103,7 @@ public class ExampleRegionObserverWithMetrics implements RegionObserver {
// at the region server level per-regionserver.
MetricRegistry registry =
((RegionCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
+ observer = new ExampleRegionObserver();
if (preGetCounter == null) {
// Create a new Counter, or get the already registered counter.
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java
index 5b97411..4709d55 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java
@@ -23,16 +23,16 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
import org.apache.hadoop.hbase.regionserver.Store;
import java.io.IOException;
+import java.util.Optional;
/**
* Coprocessor endpoint to refresh HFiles on replica.
@@ -43,7 +43,7 @@ import java.io.IOException;
* </p>
*/
public class RefreshHFilesEndpoint extends RefreshHFilesProtos.RefreshHFilesService
- implements Coprocessor, CoprocessorService {
+ implements RegionCoprocessor {
protected static final Log LOG = LogFactory.getLog(RefreshHFilesEndpoint.class);
private RegionCoprocessorEnvironment env;
@@ -51,8 +51,8 @@ public class RefreshHFilesEndpoint extends RefreshHFilesProtos.RefreshHFilesServ
}
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
index 598008b..7e75324 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
@@ -21,14 +21,14 @@ package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
@@ -48,8 +48,7 @@ import com.google.protobuf.Service;
* hbase-examples/src/main/protobuf/Examples.proto.
* </p>
*/
-public class RowCountEndpoint extends ExampleProtos.RowCountService
- implements Coprocessor, CoprocessorService {
+public class RowCountEndpoint extends ExampleProtos.RowCountService implements RegionCoprocessor {
private RegionCoprocessorEnvironment env;
public RowCountEndpoint() {
@@ -59,8 +58,8 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService
* Just returns a reference to this object, which implements the RowCounterService interface.
*/
@Override
- public Service getService() {
- return this;
+ public Optional<Service> getService() {
+ return Optional.of(this);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
index 7b8fdf3..733a003 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Optional;
import java.util.OptionalInt;
import org.apache.commons.logging.Log;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HStore;
@@ -51,20 +53,25 @@ import org.apache.zookeeper.ZooKeeper;
* This is an example showing how a RegionObserver could configured
* via ZooKeeper in order to control a Region compaction, flush, and scan policy.
*
- * This also demonstrated the use of shared
+ * This also demonstrated the use of shared
* {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state.
* See {@link RegionCoprocessorEnvironment#getSharedData()}.
*
* This would be useful for an incremental backup tool, which would indicate the last
* time of a successful backup via ZK and instruct HBase to not delete data that was
- * inserted since (based on wall clock time).
+ * inserted since (based on wall clock time).
*
* This implements org.apache.zookeeper.Watcher directly instead of using
- * {@link org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher},
+ * {@link org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher},
* because RegionObservers come and go and currently
* listeners registered with ZooKeeperWatcher cannot be removed.
*/
-public class ZooKeeperScanPolicyObserver implements RegionObserver {
+public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObserver {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
// The zk ensemble info is put in hbase config xml with given custom key.
public static final String ZK_ENSEMBLE_KEY = "ZooKeeperScanPolicyObserver.zookeeper.ensemble";
public static final String ZK_SESSION_TIMEOUT_KEY =
@@ -243,4 +250,4 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
return new StoreScanner((HStore) store, scanInfo, scan, targetCols,
((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED));
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
index e95ed7c..4eb5e41 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -155,12 +157,18 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
private boolean load = false;
private boolean check = false;
- public static class SlowMeCoproScanOperations implements RegionObserver {
+ public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver {
static final AtomicLong sleepTime = new AtomicLong(2000);
Random r = new Random();
AtomicLong countOfNext = new AtomicLong(0);
AtomicLong countOfOpen = new AtomicLong(0);
public SlowMeCoproScanOperations() {}
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final RegionScanner s) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
index 03a50e6..18a745c 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.UUID;
import org.apache.commons.logging.Log;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
@@ -247,7 +249,13 @@ public class TestImportTSVWithOperationAttributes implements Configurable {
assertTrue(verified);
}
- public static class OperationAttributesTestController implements RegionObserver {
+ public static class OperationAttributesTestController
+ implements RegionCoprocessor, RegionObserver {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
index 3a7380b..a1b4097 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.UUID;
import org.apache.commons.logging.Log;
@@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
@@ -156,7 +158,12 @@ public class TestImportTSVWithTTLs implements Configurable {
return tool;
}
- public static class TTLCheckingObserver implements RegionObserver {
+ public static class TTLCheckingObserver implements RegionCoprocessor, RegionObserver {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
index 657dbba..afad353 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.rsgroup;
import java.io.IOException;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import com.google.protobuf.RpcCallback;
@@ -35,7 +36,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.constraint.ConstraintException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -69,8 +70,9 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGro
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.yetus.audience.InterfaceAudience;
+// TODO: Encapsulate MasterObserver functions into separate subclass.
@InterfaceAudience.Private
-public class RSGroupAdminEndpoint implements MasterObserver, CoprocessorService {
+public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
private static final Log LOG = LogFactory.getLog(RSGroupAdminEndpoint.class);
private MasterServices master = null;
@@ -93,8 +95,13 @@ public class RSGroupAdminEndpoint implements MasterObserver, CoprocessorService
}
@Override
- public Service getService() {
- return groupAdminService;
+ public Optional<Service> getService() {
+ return Optional.of(groupAdminService);
+ }
+
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
}
RSGroupInfoManager getGroupInfoManager() {
@@ -106,12 +113,6 @@ public class RSGroupAdminEndpoint implements MasterObserver, CoprocessorService
* This class calls {@link RSGroupAdminServer} for actual work, converts result to protocol
* buffer response, handles exceptions if any occurred and then calls the {@code RpcCallback} with
* the response.
- * Since our CoprocessorHost asks the Coprocessor for a Service
- * ({@link CoprocessorService#getService()}) instead of doing "coproc instanceOf Service"
- * and requiring Coprocessor itself to be Service (something we do with our observers),
- * we can use composition instead of inheritance here. That makes it easy to manage
- * functionalities in concise classes (sometimes inner classes) instead of single class doing
- * many different things.
*/
private class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService {
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
index c58dc9d..a9493ca 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
@@ -97,8 +97,8 @@ public class TestRSGroups extends TestRSGroupsBase {
admin.setBalancerRunning(false,true);
rsGroupAdmin = new VerifyingRSGroupAdminClient(
new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration());
- rsGroupAdminEndpoint =
- master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0);
+ rsGroupAdminEndpoint = (RSGroupAdminEndpoint)
+ master.getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class.getName());
}
@AfterClass