You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/10/24 17:47:15 UTC

git commit: HBASE-12277 Refactor bulkLoad methods in AccessController to its own interface (Madhan Neethiraj)

Repository: hbase
Updated Branches:
  refs/heads/master 31c185aad -> 2916d4f35


HBASE-12277 Refactor bulkLoad methods in AccessController to its own interface (Madhan Neethiraj)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2916d4f3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2916d4f3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2916d4f3

Branch: refs/heads/master
Commit: 2916d4f3568184f92006dba9a1e4ef18492643ea
Parents: 31c185a
Author: stack <st...@apache.org>
Authored: Fri Oct 24 08:47:06 2014 -0700
Committer: stack <st...@apache.org>
Committed: Fri Oct 24 08:47:06 2014 -0700

----------------------------------------------------------------------
 .../hbase/coprocessor/BulkLoadObserver.java     | 54 ++++++++++++++++++++
 .../hbase/coprocessor/CoprocessorHost.java      | 20 ++++++++
 .../hbase/security/access/AccessController.java | 25 ++++++---
 .../security/access/SecureBulkLoadEndpoint.java | 36 ++++++++++---
 .../hbase/coprocessor/TestClassLoading.java     | 13 +++++
 5 files changed, 134 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2916d4f3/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
new file mode 100644
index 0000000..c7f0b90
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
+
+/**
+ * Coprocessors implement this interface to observe and mediate bulk load operations.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface BulkLoadObserver extends Coprocessor {
+    /**
+      * Called as part of SecureBulkLoadEndpoint.prepareBulkLoad() RPC call.
+      * It can't bypass the default action, e.g., ctx.bypass() won't have effect.
+      * @param ctx the environment to interact with the framework and master
+      * @throws IOException
+      */
+    void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                            PrepareBulkLoadRequest request) throws IOException;
+
+    /**
+      * Called as part of SecureBulkLoadEndpoint.cleanupBulkLoad() RPC call.
+      * It can't bypass the default action, e.g., ctx.bypass() won't have effect.
+      * @param ctx the environment to interact with the framework and master
+      * @throws IOException
+      */
+    void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                            CleanupBulkLoadRequest request) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2916d4f3/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index c9bd0b6..17fcabc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -281,6 +281,26 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
   }
 
   /**
+   * Find list of coprocessors that extend/implement the given class/interface
+   * @param cls the class/interface to look for
+   * @return the list of coprocessors, or null if not found
+   */
+  public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
+    ArrayList<T> ret = new ArrayList<T>();
+
+    for (E env: coprocessors) {
+      Coprocessor cp = env.getInstance();
+
+      if(cp != null) {
+        if (cls.isAssignableFrom(cp.getClass())) {
+          ret.add((T)cp);
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
    * Find a coprocessor environment by class name
    * @param className the class name
    * @return the coprocessor, or null if not found

http://git-wip-us.apache.org/repos/asf/hbase/blob/2916d4f3/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 400e20a..4023e07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.client.Query;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
@@ -84,6 +85,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -150,7 +153,7 @@ import com.google.protobuf.Service;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class AccessController extends BaseMasterAndRegionObserver
     implements RegionServerObserver,
-      AccessControlService.Interface, CoprocessorService, EndpointObserver {
+      AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
 
   public static final Log LOG = LogFactory.getLog(AccessController.class);
 
@@ -1891,11 +1894,15 @@ public class AccessController extends BaseMasterAndRegionObserver
   /**
    * Authorization check for
    * SecureBulkLoadProtocol.prepareBulkLoad()
-   * @param e
+   * @param ctx the context
+   * @param request the request
    * @throws IOException
    */
-  //TODO this should end up as a coprocessor hook
-  public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
+  @Override
+  public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                                 PrepareBulkLoadRequest request) throws IOException {
+    RegionCoprocessorEnvironment e = ctx.getEnvironment();
+
     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
     logResult(authResult);
     if (!authResult.isAllowed()) {
@@ -1907,11 +1914,15 @@ public class AccessController extends BaseMasterAndRegionObserver
   /**
    * Authorization security check for
    * SecureBulkLoadProtocol.cleanupBulkLoad()
-   * @param e
+   * @param ctx the context
+   * @param request the request
    * @throws IOException
    */
-  //TODO this should end up as a coprocessor hook
-  public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
+  @Override
+  public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                                 CleanupBulkLoadRequest request) throws IOException {
+    RegionCoprocessorEnvironment e = ctx.getEnvironment();
+
     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
     logResult(authResult);
     if (!authResult.isAllowed()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2916d4f3/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index 930d9b3..764a12c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.RequestContext;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -157,9 +159,18 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
                                                  PrepareBulkLoadRequest request,
                                                  RpcCallback<PrepareBulkLoadResponse> done){
     try {
-      if(userProvider.isHBaseSecurityEnabled()) {
-        getAccessController().prePrepareBulkLoad(env);
+      List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
+
+      if(bulkLoadObservers != null) {
+        ObserverContext<RegionCoprocessorEnvironment> ctx =
+                                           new ObserverContext<RegionCoprocessorEnvironment>();
+        ctx.prepare(env);
+
+        for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
+          bulkLoadObserver.prePrepareBulkLoad(ctx, request);
+        }
       }
+
       String bulkToken = createStagingDir(baseStagingDir,
           getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
       done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
@@ -174,9 +185,18 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
                               CleanupBulkLoadRequest request,
                               RpcCallback<CleanupBulkLoadResponse> done) {
     try {
-      if (userProvider.isHBaseSecurityEnabled()) {
-        getAccessController().preCleanupBulkLoad(env);
+      List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
+
+      if(bulkLoadObservers != null) {
+        ObserverContext<RegionCoprocessorEnvironment> ctx =
+                                           new ObserverContext<RegionCoprocessorEnvironment>();
+        ctx.prepare(env);
+
+        for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
+          bulkLoadObserver.preCleanupBulkLoad(ctx, request);
+        }
       }
+
       fs.delete(createStagingDir(baseStagingDir,
           getActiveUser(),
           new Path(request.getBulkToken()).getName()),
@@ -292,9 +312,11 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
     done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
   }
 
-  private AccessController getAccessController() {
-    return (AccessController) this.env.getRegion()
-        .getCoprocessorHost().findCoprocessor(AccessController.class.getName());
+  private List<BulkLoadObserver> getBulkLoadObservers() {
+    List<BulkLoadObserver> coprocessorList =
+              this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
+
+    return coprocessorList;
   }
 
   private Path createStagingDir(Path baseDir,

http://git-wip-us.apache.org/repos/asf/hbase/blob/2916d4f3/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
index 2fe121d..140c3b9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
@@ -531,6 +531,19 @@ public class TestClassLoading {
     assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors);
   }
 
+  @Test
+  public void testFindCoprocessors() {
+    // HBASE 12277: 
+    CoprocessorHost masterCpHost =
+                             TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost();
+
+    List<MasterObserver> masterObservers = masterCpHost.findCoprocessors(MasterObserver.class);
+
+    assertTrue(masterObservers != null && masterObservers.size() > 0);
+    assertEquals(masterCoprocessor.getSimpleName(),
+                 masterObservers.get(0).getClass().getSimpleName());
+  }
+
   private void waitForTable(TableName name) throws InterruptedException, IOException {
     // First wait until all regions are online
     TEST_UTIL.waitTableEnabled(name);