You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2016/07/20 02:58:22 UTC

[2/5] hbase git commit: Consolidate SecureBulkLoadEndpoint into HBase core as default for bulk load

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index 8a4d459..adb66f7 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -337,6 +337,8 @@ message BulkLoadHFileRequest {
   required RegionSpecifier region = 1;
   repeated FamilyPath family_path = 2;
   optional bool assign_seq_num = 3;
+  optional DelegationToken fs_token = 4;
+  optional string bulk_token = 5;
 
   message FamilyPath {
     required bytes family = 1;
@@ -348,6 +350,30 @@ message BulkLoadHFileResponse {
   required bool loaded = 1;
 }
 
+message DelegationToken {
+  optional bytes identifier = 1;
+  optional bytes password = 2;
+  optional string kind = 3;
+  optional string service = 4;
+}
+
+message PrepareBulkLoadRequest {
+  required TableName table_name = 1;
+  optional RegionSpecifier region = 2;
+}
+
+message PrepareBulkLoadResponse {
+  required string bulk_token = 1;
+}
+
+message CleanupBulkLoadRequest {
+  required string bulk_token = 1;
+  optional RegionSpecifier region = 2;
+}
+
+message CleanupBulkLoadResponse {
+}
+
 message CoprocessorServiceCall {
   required bytes row = 1;
   required string service_name = 2;
@@ -467,6 +493,12 @@ service ClientService {
   rpc BulkLoadHFile(BulkLoadHFileRequest)
     returns(BulkLoadHFileResponse);
 
+  rpc PrepareBulkLoad(PrepareBulkLoadRequest)
+    returns (PrepareBulkLoadResponse);
+
+  rpc CleanupBulkLoad(CleanupBulkLoadRequest)
+    returns (CleanupBulkLoadResponse);
+
   rpc ExecService(CoprocessorServiceRequest)
     returns(CoprocessorServiceResponse);
     

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
index 814735b..290355e 100644
--- a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
+++ b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
@@ -37,29 +37,6 @@ message SecureBulkLoadHFilesResponse {
   required bool loaded = 1;
 }
 
-message DelegationToken {
-  optional bytes identifier = 1;
-  optional bytes password = 2;
-  optional string kind = 3;
-  optional string service = 4;
-}
-
-message PrepareBulkLoadRequest {
-  required TableName table_name = 1;
-}
-
-message PrepareBulkLoadResponse {
-  required string bulk_token = 1;
-}
-
-message CleanupBulkLoadRequest {
-  required string bulk_token = 1;
-
-}
-
-message CleanupBulkLoadResponse {
-}
-
 service SecureBulkLoadService {
     rpc PrepareBulkLoad(PrepareBulkLoadRequest)
       returns (PrepareBulkLoadResponse);

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
index c7f0b90..1095d6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
@@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
 
 /**
  * Coprocessors implement this interface to observe and mediate bulk load operations.

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index e937569..37c344b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -334,6 +334,26 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
   }
 
   /**
+   * Find list of CoprocessorEnvironment that extend/implement the given class/interface
+   * @param cls the class/interface to look for
+   * @return the list of CoprocessorEnvironment, or null if not found
+   */
+  public List<CoprocessorEnvironment> findCoprocessorEnvironment(Class<?> cls) {
+    ArrayList<CoprocessorEnvironment> ret = new ArrayList<CoprocessorEnvironment>();
+
+    for (E env: coprocessors) {
+      Coprocessor cp = env.getInstance();
+
+      if(cp != null) {
+        if (cls.isAssignableFrom(cp.getClass())) {
+          ret.add(env);
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
    * Find a coprocessor environment by class name
    * @param className the class name
    * @return the coprocessor, or null if not found

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index a23d739..c04794b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -74,8 +74,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
@@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@@ -323,6 +322,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<>();
+    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table);
+
     try {
       /*
        * Checking hfile format is a time-consuming operation, we should have an option to skip
@@ -346,13 +347,16 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
         return;
       }
 
+      if(isSecureBulkLoadEndpointAvailable()) {
+        LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+        LOG.warn("Secure bulk load has been integrated into HBase core.");
+      }
+
       //If using secure bulk load, get source delegation token, and
       //prepare staging directory and token
       // fs is the source filesystem
       fsDelegationToken.acquireDelegationToken(fs);
-      if(isSecureBulkLoadEndpointAvailable()) {
-        bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
-      }
+      bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
 
       // Assumes that region splits can happen while this occurs.
       while (!queue.isEmpty()) {
@@ -391,7 +395,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     } finally {
       fsDelegationToken.releaseDelegationToken();
       if(bulkToken != null) {
-        new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
+        secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
       }
       pool.shutdown();
       if (queue != null && !queue.isEmpty()) {
@@ -789,21 +793,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          if (!isSecureBulkLoadEndpointAvailable()) {
-            success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
-          } else {
-            try (Table table = conn.getTable(getTableName())) {
-              secureClient = new SecureBulkLoadClient(table);
-              success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
-                bulkToken, getLocation().getRegionInfo().getStartKey());
-            }
+          try (Table table = conn.getTable(getTableName())) {
+            secureClient = new SecureBulkLoadClient(table);
+            success =
+                secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+                  assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
           }
           return success;
         } finally {
           //Best effort copying of files that might not have been imported
           //from the staging directory back to original location
           //in user directory
-          if(secureClient != null && !success) {
+          if (secureClient != null && !success) {
             FileSystem targetFs = FileSystem.get(getConf());
          // fs is the source filesystem
             if(fs == null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9567602..e03993f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -498,6 +498,8 @@ public class HRegionServer extends HasThread implements
 
   private volatile ThroughputController flushThroughputController;
 
+  protected final SecureBulkLoadManager secureBulkLoadManager;
+
   /**
    * Starts a HRegionServer at the default location.
    * @param conf
@@ -618,6 +620,9 @@ public class HRegionServer extends HasThread implements
     }
     this.configurationManager = new ConfigurationManager();
 
+    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf);
+    this.secureBulkLoadManager.start();
+
     rpcServices.start();
     putUpWebUI();
     this.walRoller = new LogRoller(this, this);
@@ -3431,4 +3436,9 @@ public class HRegionServer extends HasThread implements
   public MetricsRegionServer getMetrics() {
     return metricsRegionServer;
   }
+
+  @Override
+  public SecureBulkLoadManager getSecureBulkLoadManager() {
+    return this.secureBulkLoadManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d9ea186..9cfc5df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
@@ -147,6 +149,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
@@ -2042,21 +2046,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       checkOpen();
       requestCount.increment();
       Region region = getRegion(request.getRegion());
-      List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
-      for (FamilyPath familyPath: request.getFamilyPathList()) {
-        familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
-          familyPath.getPath()));
-      }
       boolean bypass = false;
-      if (region.getCoprocessorHost() != null) {
-        bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
-      }
       boolean loaded = false;
-      if (!bypass) {
-        loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
-      }
-      if (region.getCoprocessorHost() != null) {
-        loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+
+      if (!request.hasBulkToken()) {
+        // Old style bulk load. This will not be supported in future releases
+        List<Pair<byte[], String>> familyPaths =
+            new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount());
+        for (FamilyPath familyPath : request.getFamilyPathList()) {
+          familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(), familyPath
+              .getPath()));
+        }
+        if (region.getCoprocessorHost() != null) {
+          bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
+        }
+        if (!bypass) {
+          loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
+        }
+        if (region.getCoprocessorHost() != null) {
+          loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+        }
+      } else {
+        // secure bulk load
+        loaded = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
       }
       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
       builder.setLoaded(loaded);
@@ -2067,6 +2079,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   @Override
+  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
+      PrepareBulkLoadRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+
+      Region region = getRegion(request.getRegion());
+
+      String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request);
+      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
+      builder.setBulkToken(bulkToken);
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  @Override
+  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
+      CleanupBulkLoadRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+
+      Region region = getRegion(request.getRegion());
+
+      regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request);
+      CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build();
+      return response;
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  @Override
   public CoprocessorServiceResponse execService(final RpcController controller,
       final CoprocessorServiceRequest request) throws ServiceException {
     try {
@@ -2930,4 +2977,5 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     return UpdateConfigurationResponse.getDefaultInstance();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index c6689a9..356a88b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -81,6 +81,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
   RegionServerQuotaManager getRegionServerQuotaManager();
 
   /**
+   * @return RegionServer's instance of {@link SecureBulkLoadManager}
+   */
+  SecureBulkLoadManager getSecureBulkLoadManager();
+
+  /**
    * Context for postOpenDeployTasks().
    */
   class PostOpenDeployContext {

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
new file mode 100644
index 0000000..b47b31d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
+import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.Methods;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.PrivilegedAction;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Bulk loads in secure mode.
+ *
+ * This service addresses two issues:
+ * <ol>
+ * <li>Moving files in a secure filesystem wherein the HBase Client
+ * and HBase Server are different filesystem users.</li>
+ * <li>Does moving in a secure manner. Assuming that the filesystem
+ * is POSIX compliant.</li>
+ * </ol>
+ *
+ * The algorithm is as follows:
+ * <ol>
+ * <li>Create an hbase owned staging directory which is
+ * world traversable (711): {@code /hbase/staging}</li>
+ * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
+ * <li>A call is made to hbase to create a secret staging directory
+ * which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li>
+ * <li>The user moves the data into the random staging directory,
+ * then calls bulkLoadHFiles()</li>
+ * </ol>
+ *
+ * Like delegation tokens the strength of the security lies in the length
+ * and randomness of the secret directory.
+ *
+ */
+@InterfaceAudience.Private
+public class SecureBulkLoadManager {
+
+  public static final long VERSION = 0L;
+
+  //320/5 = 64 characters
+  private static final int RANDOM_WIDTH = 320;
+  private static final int RANDOM_RADIX = 32;
+
+  private static final Log LOG = LogFactory.getLog(SecureBulkLoadManager.class);
+
+  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
+  private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
+
+  private SecureRandom random;
+  private FileSystem fs;
+  private Configuration conf;
+
+  //two levels so it doesn't get deleted accidentally
+  //no sticky bit in Hadoop 1.0
+  private Path baseStagingDir;
+
+  private UserProvider userProvider;
+
+  SecureBulkLoadManager(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public void start() {
+    random = new SecureRandom();
+    baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
+    this.userProvider = UserProvider.instantiate(conf);
+
+    try {
+      fs = FileSystem.get(conf);
+      fs.mkdirs(baseStagingDir, PERM_HIDDEN);
+      fs.setPermission(baseStagingDir, PERM_HIDDEN);
+      FileStatus status = fs.getFileStatus(baseStagingDir);
+      //no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased
+      fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
+      if (status == null) {
+        throw new IllegalStateException("Failed to create staging directory "
+            + baseStagingDir.toString());
+      }
+      if (!status.getPermission().equals(PERM_HIDDEN)) {
+        throw new IllegalStateException(
+            "Staging directory already exists but permissions aren't set to '-rwx--x--x' "
+                + baseStagingDir.toString());
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to create or set permission on staging directory "
+          + baseStagingDir.toString(), e);
+      throw new IllegalStateException("Failed to create or set permission on staging directory "
+          + baseStagingDir.toString(), e);
+    }
+  }
+
+  public void stop() throws IOException {
+  }
+
+  public String prepareBulkLoad(final Region region, final PrepareBulkLoadRequest request)
+      throws IOException {
+    List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
+
+    if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
+      ObserverContext<RegionCoprocessorEnvironment> ctx =
+          new ObserverContext<RegionCoprocessorEnvironment>();
+      ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
+          .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
+
+      for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
+        bulkLoadObserver.prePrepareBulkLoad(ctx, request);
+      }
+    }
+
+    String bulkToken =
+        createStagingDir(baseStagingDir, getActiveUser(), region.getTableDesc().getTableName())
+            .toString();
+
+    return bulkToken;
+  }
+
+  public void cleanupBulkLoad(final Region region, final CleanupBulkLoadRequest request)
+      throws IOException {
+    List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
+
+    if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
+      ObserverContext<RegionCoprocessorEnvironment> ctx =
+          new ObserverContext<RegionCoprocessorEnvironment>();
+      ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
+        .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
+
+      for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
+        bulkLoadObserver.preCleanupBulkLoad(ctx, request);
+      }
+    }
+
+    fs.delete(new Path(request.getBulkToken()), true);
+  }
+
+  public boolean secureBulkLoadHFiles(final Region region,
+      final BulkLoadHFileRequest request) throws IOException {
+    final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount());
+    for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
+      familyPaths.add(new Pair<byte[], String>(el.getFamily().toByteArray(), el.getPath()));
+    }
+
+    Token userToken = null;
+    if (userProvider.isHadoopSecurityEnabled()) {
+      userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
+              .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
+              request.getFsToken().getService()));
+    }
+    final String bulkToken = request.getBulkToken();
+    User user = getActiveUser();
+    final UserGroupInformation ugi = user.getUGI();
+    if(userToken != null) {
+      ugi.addToken(userToken);
+    } else if (userProvider.isHadoopSecurityEnabled()) {
+      //we allow this to pass through in "simple" security mode
+      //for mini cluster testing
+      throw new DoNotRetryIOException("User token cannot be null");
+    }
+
+    boolean bypass = false;
+    if (region.getCoprocessorHost() != null) {
+        bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
+    }
+    boolean loaded = false;
+    if (!bypass) {
+      // Get the target fs (HBase region server fs) delegation token
+      // Since we have checked the permission via 'preBulkLoadHFile', now let's give
+      // the 'request user' necessary token to operate on the target fs.
+      // After this point the 'doAs' user will hold two tokens, one for the source fs
+      // ('request user'), another for the target fs (HBase region server principal).
+      if (userProvider.isHadoopSecurityEnabled()) {
+        FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+        targetfsDelegationToken.acquireDelegationToken(fs);
+
+        Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
+        if (targetFsToken != null
+            && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
+          ugi.addToken(targetFsToken);
+        }
+      }
+
+      loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
+        @Override
+        public Boolean run() {
+          FileSystem fs = null;
+          try {
+            fs = FileSystem.get(conf);
+            for(Pair<byte[], String> el: familyPaths) {
+              Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
+              if(!fs.exists(stageFamily)) {
+                fs.mkdirs(stageFamily);
+                fs.setPermission(stageFamily, PERM_ALL_ACCESS);
+              }
+            }
+            //We call bulkLoadHFiles as requesting user
+            //To enable access prior to staging
+            return region.bulkLoadHFiles(familyPaths, true,
+                new SecureBulkLoadListener(fs, bulkToken, conf));
+          } catch (Exception e) {
+            LOG.error("Failed to complete bulk load", e);
+          } finally {
+            if (fs != null) {
+              try {
+                if (!UserGroupInformation.getLoginUser().equals(ugi)) {
+                  FileSystem.closeAllForUGI(ugi);
+                }
+              } catch (IOException e) {
+                LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e);
+              }
+            }
+          }
+          return false;
+        }
+      });
+    }
+    if (region.getCoprocessorHost() != null) {
+       loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+    }
+    return loaded;
+  }
+
+  private List<BulkLoadObserver> getBulkLoadObservers(Region region) {
+    List<BulkLoadObserver> coprocessorList =
+        region.getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
+
+    return coprocessorList;
+  }
+
+  private Path createStagingDir(Path baseDir,
+                                User user,
+                                TableName tableName) throws IOException {
+    String tblName = tableName.getNameAsString().replace(":", "_");
+    String randomDir = user.getShortName()+"__"+ tblName +"__"+
+        (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
+    return createStagingDir(baseDir, user, randomDir);
+  }
+
+  private Path createStagingDir(Path baseDir,
+                                User user,
+                                String randomDir) throws IOException {
+    Path p = new Path(baseDir, randomDir);
+    fs.mkdirs(p, PERM_ALL_ACCESS);
+    fs.setPermission(p, PERM_ALL_ACCESS);
+    return p;
+  }
+
+  private User getActiveUser() throws IOException {
+    User user = RpcServer.getRequestUser();
+    if (user == null) {
+      // for non-rpc handling, fallback to system user
+      user = userProvider.getCurrent();
+    }
+
+    //this is for testing
+    if (userProvider.isHadoopSecurityEnabled()
+        && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
+      return User.createUserForTesting(conf, user.getShortName(), new String[]{});
+    }
+
+    return user;
+  }
+
+  private static class SecureBulkLoadListener implements BulkLoadListener {
+    // Target filesystem
+    private final FileSystem fs;
+    private final String stagingDir;
+    private final Configuration conf;
+    // Source filesystem
+    private FileSystem srcFs = null;
+    private Map<String, FsPermission> origPermissions = null;
+
+    public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
+      this.fs = fs;
+      this.stagingDir = stagingDir;
+      this.conf = conf;
+      this.origPermissions = new HashMap<String, FsPermission>();
+    }
+
+    @Override
+    public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
+      Path p = new Path(srcPath);
+      Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
+
+      // In case of Replication for bulk load files, hfiles are already copied in staging directory
+      if (p.equals(stageP)) {
+        LOG.debug(p.getName()
+            + " is already available in staging directory. Skipping copy or rename.");
+        return stageP.toString();
+      }
+
+      if (srcFs == null) {
+        srcFs = FileSystem.get(p.toUri(), conf);
+      }
+
+      if(!isFile(p)) {
+        throw new IOException("Path does not reference a file: " + p);
+      }
+
+      // Check to see if the source and target filesystems are the same
+      if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
+        LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
+            "the destination filesystem. Copying file over to destination staging dir.");
+        FileUtil.copy(srcFs, p, fs, stageP, false, conf);
+      } else {
+        LOG.debug("Moving " + p + " to " + stageP);
+        FileStatus origFileStatus = fs.getFileStatus(p);
+        origPermissions.put(srcPath, origFileStatus.getPermission());
+        if(!fs.rename(p, stageP)) {
+          throw new IOException("Failed to move HFile: " + p + " to " + stageP);
+        }
+      }
+      fs.setPermission(stageP, PERM_ALL_ACCESS);
+      return stageP.toString();
+    }
+
+    @Override
+    public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
+      LOG.debug("Bulk Load done for: " + srcPath);
+    }
+
+    @Override
+    public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
+      if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
+        // files are copied so no need to move them back
+        return;
+      }
+      Path p = new Path(srcPath);
+      Path stageP = new Path(stagingDir,
+          new Path(Bytes.toString(family), p.getName()));
+
+      // In case of Replication for bulk load files, hfiles are not renamed by end point during
+      // prepare stage, so no need of rename here again
+      if (p.equals(stageP)) {
+        LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
+        return;
+      }
+
+      LOG.debug("Moving " + stageP + " back to " + p);
+      if(!fs.rename(stageP, p))
+        throw new IOException("Failed to move HFile: " + stageP + " to " + p);
+
+      // restore original permission
+      if (origPermissions.containsKey(srcPath)) {
+        fs.setPermission(p, origPermissions.get(srcPath));
+      } else {
+        LOG.warn("Can't find previous permission for path=" + srcPath);
+      }
+    }
+
+    /**
+     * Check if the path is referencing a file.
+     * This is mainly needed to avoid symlinks.
+     * @param p
+     * @return true if the p is a file
+     * @throws IOException
+     */
+    private boolean isFile(Path p) throws IOException {
+      FileStatus status = srcFs.getFileStatus(p);
+      boolean isFile = !status.isDirectory();
+      try {
+        isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
+      } catch (Exception e) {
+      }
+      return isFile;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index f21d8e2..7d5fc32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -95,10 +95,10 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -2145,7 +2145,7 @@ public class AccessController extends BaseMasterAndRegionObserver
    */
   @Override
   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
-                                 PrepareBulkLoadRequest request) throws IOException {
+      PrepareBulkLoadRequest request) throws IOException {
     requireAccess("prePareBulkLoad",
         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
   }
@@ -2159,7 +2159,7 @@ public class AccessController extends BaseMasterAndRegionObserver
    */
   @Override
   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
-                                 CleanupBulkLoadRequest request) throws IOException {
+      CleanupBulkLoadRequest request) throws IOException {
     requireAccess("preCleanupBulkLoad",
         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index c1f9251..cb143b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -18,140 +18,52 @@
 
 package org.apache.hadoop.hbase.security.access;
 
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
+import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadResponse;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
-import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.FsDelegationToken;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSHDFSUtils;
-import org.apache.hadoop.hbase.util.Methods;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.security.PrivilegedAction;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
 
 /**
  * Coprocessor service for bulk loads in secure mode.
- * This coprocessor has to be installed as part of enabling
- * security in HBase.
- *
- * This service addresses two issues:
- * <ol>
- * <li>Moving files in a secure filesystem wherein the HBase Client
- * and HBase Server are different filesystem users.</li>
- * <li>Does moving in a secure manner. Assuming that the filesystem
- * is POSIX compliant.</li>
- * </ol>
- *
- * The algorithm is as follows:
- * <ol>
- * <li>Create an hbase owned staging directory which is
- * world traversable (711): {@code /hbase/staging}</li>
- * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
- * <li>A call is made to hbase to create a secret staging directory
- * which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li>
- * <li>The user moves the data into the random staging directory,
- * then calls bulkLoadHFiles()</li>
- * </ol>
- * Like delegation tokens the strength of the security lies in the length
- * and randomness of the secret directory.
- *
+ * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
  */
 @InterfaceAudience.Private
+@Deprecated
 public class SecureBulkLoadEndpoint extends SecureBulkLoadService
     implements CoprocessorService, Coprocessor {
 
   public static final long VERSION = 0L;
 
-  //320/5 = 64 characters
-  private static final int RANDOM_WIDTH = 320;
-  private static final int RANDOM_RADIX = 32;
-
   private static final Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class);
 
-  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
-  private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
-
-  private SecureRandom random;
-  private FileSystem fs;
-  private Configuration conf;
-
-  //two levels so it doesn't get deleted accidentally
-  //no sticky bit in Hadoop 1.0
-  private Path baseStagingDir;
-
   private RegionCoprocessorEnvironment env;
 
-  private UserProvider userProvider;
-
   @Override
   public void start(CoprocessorEnvironment env) {
     this.env = (RegionCoprocessorEnvironment)env;
-    random = new SecureRandom();
-    conf = env.getConfiguration();
-    baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
-    this.userProvider = UserProvider.instantiate(conf);
-
-    try {
-      fs = FileSystem.get(conf);
-      fs.mkdirs(baseStagingDir, PERM_HIDDEN);
-      fs.setPermission(baseStagingDir, PERM_HIDDEN);
-      //no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased
-      fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
-      FileStatus status = fs.getFileStatus(baseStagingDir);
-      if(status == null) {
-        throw new IllegalStateException("Failed to create staging directory");
-      }
-      if(!status.getPermission().equals(PERM_HIDDEN)) {
-        throw new IllegalStateException(
-            "Directory already exists but permissions aren't set to '-rwx--x--x' ");
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to get FileSystem instance",e);
-    }
+    LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+    LOG.warn("Secure bulk load has been integrated into HBase core.");
   }
 
   @Override
@@ -159,24 +71,12 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
   }
 
   @Override
-  public void prepareBulkLoad(RpcController controller,
-                                                 PrepareBulkLoadRequest request,
-                                                 RpcCallback<PrepareBulkLoadResponse> done){
+  public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
+      RpcCallback<PrepareBulkLoadResponse> done) {
     try {
-      List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
-
-      if(bulkLoadObservers != null) {
-        ObserverContext<RegionCoprocessorEnvironment> ctx =
-                                           new ObserverContext<RegionCoprocessorEnvironment>();
-        ctx.prepare(env);
-
-        for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
-          bulkLoadObserver.prePrepareBulkLoad(ctx, request);
-        }
-      }
-
-      String bulkToken = createStagingDir(baseStagingDir,
-          getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
+      SecureBulkLoadManager secureBulkLoadManager =
+          this.env.getRegionServerServices().getSecureBulkLoadManager();
+      String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(), request);
       done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -185,23 +85,12 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
   }
 
   @Override
-  public void cleanupBulkLoad(RpcController controller,
-                              CleanupBulkLoadRequest request,
-                              RpcCallback<CleanupBulkLoadResponse> done) {
+  public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
+      RpcCallback<CleanupBulkLoadResponse> done) {
     try {
-      List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
-
-      if(bulkLoadObservers != null) {
-        ObserverContext<RegionCoprocessorEnvironment> ctx =
-                                           new ObserverContext<RegionCoprocessorEnvironment>();
-        ctx.prepare(env);
-
-        for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
-          bulkLoadObserver.preCleanupBulkLoad(ctx, request);
-        }
-      }
-
-      fs.delete(new Path(request.getBulkToken()), true);
+      SecureBulkLoadManager secureBulkLoadManager =
+          this.env.getRegionServerServices().getSecureBulkLoadManager();
+      secureBulkLoadManager.cleanupBulkLoad(this.env.getRegion(), request);
       done.run(CleanupBulkLoadResponse.newBuilder().build());
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -210,262 +99,35 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
   }
 
   @Override
-  public void secureBulkLoadHFiles(RpcController controller,
-                                   SecureBulkLoadHFilesRequest request,
-                                   RpcCallback<SecureBulkLoadHFilesResponse> done) {
-    final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
-    for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
-      familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
-    }
-
-    Token userToken = null;
-    if (userProvider.isHadoopSecurityEnabled()) {
-      userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
-              .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
-              request.getFsToken().getService()));
-    }
-    final String bulkToken = request.getBulkToken();
-    User user = getActiveUser();
-    final UserGroupInformation ugi = user.getUGI();
-    if(userToken != null) {
-      ugi.addToken(userToken);
-    } else if (userProvider.isHadoopSecurityEnabled()) {
-      //we allow this to pass through in "simple" security mode
-      //for mini cluster testing
-      ResponseConverter.setControllerException(controller,
-          new DoNotRetryIOException("User token cannot be null"));
-      done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
-      return;
-    }
-
-    Region region = env.getRegion();
-    boolean bypass = false;
-    if (region.getCoprocessorHost() != null) {
-      try {
-        bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
-      } catch (IOException e) {
-        ResponseConverter.setControllerException(controller, e);
-        done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
-        return;
-      }
-    }
+  public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
+      RpcCallback<SecureBulkLoadHFilesResponse> done) {
     boolean loaded = false;
-    if (!bypass) {
-      // Get the target fs (HBase region server fs) delegation token
-      // Since we have checked the permission via 'preBulkLoadHFile', now let's give
-      // the 'request user' necessary token to operate on the target fs.
-      // After this point the 'doAs' user will hold two tokens, one for the source fs
-      // ('request user'), another for the target fs (HBase region server principal).
-      if (userProvider.isHadoopSecurityEnabled()) {
-        FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
-        try {
-          targetfsDelegationToken.acquireDelegationToken(fs);
-        } catch (IOException e) {
-          ResponseConverter.setControllerException(controller, e);
-          done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
-          return;
-        }
-        Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
-        if (targetFsToken != null
-            && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
-          ugi.addToken(targetFsToken);
-        }
-      }
-
-      loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
-        @Override
-        public Boolean run() {
-          FileSystem fs = null;
-          try {
-            Configuration conf = env.getConfiguration();
-            fs = FileSystem.get(conf);
-            for(Pair<byte[], String> el: familyPaths) {
-              Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
-              if(!fs.exists(stageFamily)) {
-                fs.mkdirs(stageFamily);
-                fs.setPermission(stageFamily, PERM_ALL_ACCESS);
-              }
-            }
-            //We call bulkLoadHFiles as requesting user
-            //To enable access prior to staging
-            return env.getRegion().bulkLoadHFiles(familyPaths, true,
-                new SecureBulkLoadListener(fs, bulkToken, conf));
-          } catch (Exception e) {
-            LOG.error("Failed to complete bulk load", e);
-          } finally {
-            if (fs != null) {
-              try {
-                if (!UserGroupInformation.getLoginUser().equals(ugi)) {
-                  FileSystem.closeAllForUGI(ugi);
-                }
-              } catch (IOException e) {
-                LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e);
-              }
-            }
-          }
-          return false;
-        }
-      });
-    }
-    if (region.getCoprocessorHost() != null) {
-      try {
-        loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
-      } catch (IOException e) {
-        ResponseConverter.setControllerException(controller, e);
-        done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
-        return;
-      }
+    try {
+      SecureBulkLoadManager secureBulkLoadManager =
+          this.env.getRegionServerServices().getSecureBulkLoadManager();
+      BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request);
+      loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(), bulkLoadHFileRequest);
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
     }
     done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
   }
 
-  private List<BulkLoadObserver> getBulkLoadObservers() {
-    List<BulkLoadObserver> coprocessorList =
-              this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
-
-    return coprocessorList;
-  }
-
-  private Path createStagingDir(Path baseDir,
-                                User user,
-                                TableName tableName) throws IOException {
-    String tblName = tableName.getNameAsString().replace(":", "_");
-    String randomDir = user.getShortName()+"__"+ tblName +"__"+
-        (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
-    return createStagingDir(baseDir, user, randomDir);
-  }
-
-  private Path createStagingDir(Path baseDir,
-                                User user,
-                                String randomDir) throws IOException {
-    Path p = new Path(baseDir, randomDir);
-    fs.mkdirs(p, PERM_ALL_ACCESS);
-    fs.setPermission(p, PERM_ALL_ACCESS);
-    return p;
-  }
-
-  private User getActiveUser() {
-    User user = RpcServer.getRequestUser();
-    if (user == null) {
-      return null;
-    }
-
-    //this is for testing
-    if (userProvider.isHadoopSecurityEnabled()
-        && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
-      return User.createUserForTesting(conf, user.getShortName(), new String[]{});
-    }
-
-    return user;
+  private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
+      SecureBulkLoadHFilesRequest request) {
+    BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
+    RegionSpecifier region =
+        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env
+            .getRegionInfo().getRegionName());
+
+    bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken())
+        .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum())
+        .addAllFamilyPath(request.getFamilyPathList());
+    return bulkLoadHFileRequest.build();
   }
 
   @Override
   public Service getService() {
     return this;
   }
-
-  private static class SecureBulkLoadListener implements BulkLoadListener {
-    // Target filesystem
-    private FileSystem fs;
-    private String stagingDir;
-    private Configuration conf;
-    // Source filesystem
-    private FileSystem srcFs = null;
-    private Map<String, FsPermission> origPermissions = null;
-
-    public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
-      this.fs = fs;
-      this.stagingDir = stagingDir;
-      this.conf = conf;
-      this.origPermissions = new HashMap<String, FsPermission>();
-    }
-
-    @Override
-    public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
-      Path p = new Path(srcPath);
-      Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
-
-      // In case of Replication for bulk load files, hfiles are already copied in staging directory
-      if (p.equals(stageP)) {
-        LOG.debug(p.getName()
-            + " is already available in staging directory. Skipping copy or rename.");
-        return stageP.toString();
-      }
-
-      if (srcFs == null) {
-        srcFs = FileSystem.get(p.toUri(), conf);
-      }
-
-      if(!isFile(p)) {
-        throw new IOException("Path does not reference a file: " + p);
-      }
-
-      // Check to see if the source and target filesystems are the same
-      if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
-        LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
-            "the destination filesystem. Copying file over to destination staging dir.");
-        FileUtil.copy(srcFs, p, fs, stageP, false, conf);
-      } else {
-        LOG.debug("Moving " + p + " to " + stageP);
-        FileStatus origFileStatus = fs.getFileStatus(p);
-        origPermissions.put(srcPath, origFileStatus.getPermission());
-        if(!fs.rename(p, stageP)) {
-          throw new IOException("Failed to move HFile: " + p + " to " + stageP);
-        }
-      }
-      fs.setPermission(stageP, PERM_ALL_ACCESS);
-      return stageP.toString();
-    }
-
-    @Override
-    public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
-      LOG.debug("Bulk Load done for: " + srcPath);
-    }
-
-    @Override
-    public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
-      if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
-        // files are copied so no need to move them back
-        return;
-      }
-      Path p = new Path(srcPath);
-      Path stageP = new Path(stagingDir,
-          new Path(Bytes.toString(family), p.getName()));
-
-      // In case of Replication for bulk load files, hfiles are not renamed by end point during
-      // prepare stage, so no need of rename here again
-      if (p.equals(stageP)) {
-        LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
-        return;
-      }
-
-      LOG.debug("Moving " + stageP + " back to " + p);
-      if(!fs.rename(stageP, p))
-        throw new IOException("Failed to move HFile: " + stageP + " to " + p);
-
-      // restore original permission
-      if (origPermissions.containsKey(srcPath)) {
-        fs.setPermission(p, origPermissions.get(srcPath));
-      } else {
-        LOG.warn("Can't find previous permission for path=" + srcPath);
-      }
-    }
-
-    /**
-     * Check if the path is referencing a file.
-     * This is mainly needed to avoid symlinks.
-     * @param p
-     * @return true if the p is a file
-     * @throws IOException
-     */
-    private boolean isFile(Path p) throws IOException {
-      FileStatus status = srcFs.getFileStatus(p);
-      boolean isFile = !status.isDirectory();
-      try {
-        isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
-      } catch (Exception e) {
-      }
-      return isFile;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index bfa14cb..a6dc59f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1080,6 +1080,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
     // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
     createRootDir(create);
 
+    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
+    // for tests that do not read hbase-defaults.xml
+    setHBaseFsTmpDir();
+
     // These settings will make the server waits until this exact number of
     // regions servers are connected.
     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
@@ -1104,10 +1108,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
     getAdmin(); // create immediately the hbaseAdmin
     LOG.info("Minicluster is up");
 
-    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
-    // for tests that do not read hbase-defaults.xml
-    setHBaseFsTmpDir();
-
     return (MiniHBaseCluster)this.hbaseCluster;
   }
 
@@ -1278,6 +1278,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
     } else {
       LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
     }
+    this.conf.set("hbase.bulkload.staging.dir", this.conf.get("hbase.fs.tmp.dir"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 6cd1963..6f225d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -316,7 +317,6 @@ public class MockRegionServerServices implements RegionServerServices {
 
   @Override
   public ClusterConnection getClusterConnection() {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -334,4 +334,9 @@ public class MockRegionServerServices implements RegionServerServices {
   public MetricsRegionServer getMetrics() {
     return null;
   }
+
+  @Override
+  public SecureBulkLoadManager getSecureBulkLoadManager() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 48d7efc..354f0a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -336,16 +334,21 @@ public class TestReplicaWithCluster {
     LOG.debug("Loading test data");
     @SuppressWarnings("deprecation")
     final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
+    table = conn.getTable(hdt.getTableName());
+    final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
     RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
       conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
         @Override
         public Void call(int timeout) throws Exception {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
             + Bytes.toStringBinary(getRow()));
+          SecureBulkLoadClient secureClient = null;
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          BulkLoadHFileRequest request =
-            RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
-          getStub().bulkLoadHFile(null, request);
+          try (Table table = conn.getTable(getTableName())) {
+            secureClient = new SecureBulkLoadClient(table);
+            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+                  true, null, bulkToken);
+          }
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java
deleted file mode 100644
index 11627a1..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.junit.BeforeClass;
-import org.junit.experimental.categories.Category;
-
-@Category(LargeTests.class)
-public class TestLoadIncrementalHFilesUseSecurityEndPoint extends TestLoadIncrementalHFiles {
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
-      MAX_FILES_PER_REGION_PER_FAMILY);
-    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
-      "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
-    // change default behavior so that tag values are returned with normal rpcs
-    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
-        KeyValueCodecWithTags.class.getCanonicalName());
-
-    util.startMiniCluster();
-    setupNamespace();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 69f2e35..2927023 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodes
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@@ -90,6 +92,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
@@ -103,6 +107,7 @@ import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -661,7 +666,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
 
   @Override
   public ClusterConnection getClusterConnection() {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -679,4 +683,21 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   public MetricsRegionServer getMetrics() {
     return null;
   }
-}
\ No newline at end of file
+
+  @Override
+  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
+      PrepareBulkLoadRequest request) throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
+      CleanupBulkLoadRequest request) throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public SecureBulkLoadManager getSecureBulkLoadManager() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
new file mode 100644
index 0000000..9ecc5d6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
+import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
+ * @deprecated Use for backward compatibility testing only. Will be removed when
+ *             SecureBulkLoadEndpoint is not supported.
+ */
+@InterfaceAudience.Private
+public class SecureBulkLoadEndpointClient {
+  private Table table;
+
+  public SecureBulkLoadEndpointClient(Table table) {
+    this.table = table;
+  }
+
+  public String prepareBulkLoad(final TableName tableName) throws IOException {
+    try {
+      CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
+      SecureBulkLoadProtos.SecureBulkLoadService instance =
+          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+      ServerRpcController controller = new ServerRpcController();
+
+      BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback =
+          new BlockingRpcCallback<PrepareBulkLoadResponse>();
+
+      PrepareBulkLoadRequest request =
+          PrepareBulkLoadRequest.newBuilder()
+          .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
+
+      instance.prepareBulkLoad(controller,
+          request,
+          rpcCallback);
+
+      PrepareBulkLoadResponse response = rpcCallback.get();
+      if (controller.failedOnException()) {
+        throw controller.getFailedOn();
+      }
+
+      return response.getBulkToken();
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+  public void cleanupBulkLoad(final String bulkToken) throws IOException {
+    try {
+      CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
+      SecureBulkLoadProtos.SecureBulkLoadService instance =
+          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+      ServerRpcController controller = new ServerRpcController();
+
+      BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback =
+          new BlockingRpcCallback<CleanupBulkLoadResponse>();
+
+      CleanupBulkLoadRequest request =
+          CleanupBulkLoadRequest.newBuilder()
+              .setBulkToken(bulkToken).build();
+
+      instance.cleanupBulkLoad(controller,
+          request,
+          rpcCallback);
+
+      if (controller.failedOnException()) {
+        throw controller.getFailedOn();
+      }
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+  public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
+                         final Token<?> userToken,
+                         final String bulkToken,
+                         final byte[] startRow) throws IOException {
+    // we never want to send a batch of HFiles to all regions, thus cannot call
+    // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
+    try {
+      CoprocessorRpcChannel channel = table.coprocessorService(startRow);
+      SecureBulkLoadProtos.SecureBulkLoadService instance =
+          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+      DelegationToken protoDT =
+          DelegationToken.newBuilder().build();
+      if(userToken != null) {
+        protoDT =
+            DelegationToken.newBuilder()
+              .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
+              .setPassword(ByteStringer.wrap(userToken.getPassword()))
+              .setKind(userToken.getKind().toString())
+              .setService(userToken.getService().toString()).build();
+      }
+
+      List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
+          new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
+      for(Pair<byte[], String> el: familyPaths) {
+        protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
+          .setFamily(ByteStringer.wrap(el.getFirst()))
+          .setPath(el.getSecond()).build());
+      }
+
+      SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
+          SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
+            .setFsToken(protoDT)
+            .addAllFamilyPath(protoFamilyPaths)
+            .setBulkToken(bulkToken).build();
+
+      ServerRpcController controller = new ServerRpcController();
+      BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
+          new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
+      instance.secureBulkLoadHFiles(controller,
+        request,
+        rpcCallback);
+
+      SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
+      if (controller.failedOnException()) {
+        throw controller.getFailedOn();
+      }
+      return response.getLoaded();
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+  public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
+    return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index bd5c91e..6e68201 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -91,15 +90,15 @@ import com.google.common.collect.Lists;
 @Category({RegionServerTests.class, LargeTests.class})
 public class TestHRegionServerBulkLoad {
   private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
-  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
-  private final static Configuration conf = UTIL.getConfiguration();
-  private final static byte[] QUAL = Bytes.toBytes("qual");
-  private final static int NUM_CFS = 10;
+  protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  protected final static Configuration conf = UTIL.getConfiguration();
+  protected final static byte[] QUAL = Bytes.toBytes("qual");
+  protected final static int NUM_CFS = 10;
   private int sleepDuration;
   public static int BLOCKSIZE = 64 * 1024;
   public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
 
-  private final static byte[][] families = new byte[NUM_CFS][];
+  protected final static byte[][] families = new byte[NUM_CFS][];
   static {
     for (int i = 0; i < NUM_CFS; i++) {
       families[i] = Bytes.toBytes(family(i));
@@ -200,16 +199,21 @@ public class TestHRegionServerBulkLoad {
 
       // bulk load HFiles
       final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
+      Table table = conn.getTable(tableName);
+      final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
       RegionServerCallable<Void> callable =
           new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
         @Override
         public Void call(int callTimeout) throws Exception {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()));
+          SecureBulkLoadClient secureClient = null;
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          BulkLoadHFileRequest request =
-            RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
-          getStub().bulkLoadHFile(null, request);
+          try (Table table = conn.getTable(getTableName())) {
+            secureClient = new SecureBulkLoadClient(table);
+            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+                  true, null, bulkToken);
+          }
           return null;
         }
       };
@@ -320,7 +324,7 @@ public class TestHRegionServerBulkLoad {
    * Creates a table with given table name and specified number of column
    * families if the table does not already exist.
    */
-  private void setupTable(TableName table, int cfs) throws IOException {
+  public void setupTable(TableName table, int cfs) throws IOException {
     try {
       LOG.info("Creating table " + table);
       HTableDescriptor htd = new HTableDescriptor(table);