You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2016/07/20 02:58:22 UTC
[2/5] hbase git commit: Consolidate SecureBulkLoadEndpoint into HBase
core as default for bulk load
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index 8a4d459..adb66f7 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -337,6 +337,8 @@ message BulkLoadHFileRequest {
required RegionSpecifier region = 1;
repeated FamilyPath family_path = 2;
optional bool assign_seq_num = 3;
+ optional DelegationToken fs_token = 4;
+ optional string bulk_token = 5;
message FamilyPath {
required bytes family = 1;
@@ -348,6 +350,30 @@ message BulkLoadHFileResponse {
required bool loaded = 1;
}
+message DelegationToken {
+ optional bytes identifier = 1;
+ optional bytes password = 2;
+ optional string kind = 3;
+ optional string service = 4;
+}
+
+message PrepareBulkLoadRequest {
+ required TableName table_name = 1;
+ optional RegionSpecifier region = 2;
+}
+
+message PrepareBulkLoadResponse {
+ required string bulk_token = 1;
+}
+
+message CleanupBulkLoadRequest {
+ required string bulk_token = 1;
+ optional RegionSpecifier region = 2;
+}
+
+message CleanupBulkLoadResponse {
+}
+
message CoprocessorServiceCall {
required bytes row = 1;
required string service_name = 2;
@@ -467,6 +493,12 @@ service ClientService {
rpc BulkLoadHFile(BulkLoadHFileRequest)
returns(BulkLoadHFileResponse);
+ rpc PrepareBulkLoad(PrepareBulkLoadRequest)
+ returns (PrepareBulkLoadResponse);
+
+ rpc CleanupBulkLoad(CleanupBulkLoadRequest)
+ returns (CleanupBulkLoadResponse);
+
rpc ExecService(CoprocessorServiceRequest)
returns(CoprocessorServiceResponse);
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
index 814735b..290355e 100644
--- a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
+++ b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
@@ -37,29 +37,6 @@ message SecureBulkLoadHFilesResponse {
required bool loaded = 1;
}
-message DelegationToken {
- optional bytes identifier = 1;
- optional bytes password = 2;
- optional string kind = 3;
- optional string service = 4;
-}
-
-message PrepareBulkLoadRequest {
- required TableName table_name = 1;
-}
-
-message PrepareBulkLoadResponse {
- required string bulk_token = 1;
-}
-
-message CleanupBulkLoadRequest {
- required string bulk_token = 1;
-
-}
-
-message CleanupBulkLoadResponse {
-}
-
service SecureBulkLoadService {
rpc PrepareBulkLoad(PrepareBulkLoadRequest)
returns (PrepareBulkLoadResponse);
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
index c7f0b90..1095d6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
@@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
/**
* Coprocessors implement this interface to observe and mediate bulk load operations.
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index e937569..37c344b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -334,6 +334,26 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
}
/**
+ * Find list of CoprocessorEnvironment that extend/implement the given class/interface
+ * @param cls the class/interface to look for
+ * @return the list of CoprocessorEnvironment, or null if not found
+ */
+ public List<CoprocessorEnvironment> findCoprocessorEnvironment(Class<?> cls) {
+ ArrayList<CoprocessorEnvironment> ret = new ArrayList<CoprocessorEnvironment>();
+
+ for (E env: coprocessors) {
+ Coprocessor cp = env.getInstance();
+
+ if(cp != null) {
+ if (cls.isAssignableFrom(cp.getClass())) {
+ ret.add(env);
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
* Find a coprocessor environment by class name
* @param className the class name
* @return the coprocessor, or null if not found
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index a23d739..c04794b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -74,8 +74,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
@@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@@ -323,6 +322,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>();
+ SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table);
+
try {
/*
* Checking hfile format is a time-consuming operation, we should have an option to skip
@@ -346,13 +347,16 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return;
}
+ if(isSecureBulkLoadEndpointAvailable()) {
+ LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+ LOG.warn("Secure bulk load has been integrated into HBase core.");
+ }
+
//If using secure bulk load, get source delegation token, and
//prepare staging directory and token
// fs is the source filesystem
fsDelegationToken.acquireDelegationToken(fs);
- if(isSecureBulkLoadEndpointAvailable()) {
- bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
- }
+ bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
// Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) {
@@ -391,7 +395,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} finally {
fsDelegationToken.releaseDelegationToken();
if(bulkToken != null) {
- new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
+ secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
}
pool.shutdown();
if (queue != null && !queue.isEmpty()) {
@@ -789,21 +793,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
byte[] regionName = getLocation().getRegionInfo().getRegionName();
- if (!isSecureBulkLoadEndpointAvailable()) {
- success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
- } else {
- try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(table);
- success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
- bulkToken, getLocation().getRegionInfo().getStartKey());
- }
+ try (Table table = conn.getTable(getTableName())) {
+ secureClient = new SecureBulkLoadClient(table);
+ success =
+ secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+ assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
}
return success;
} finally {
//Best effort copying of files that might not have been imported
//from the staging directory back to original location
//in user directory
- if(secureClient != null && !success) {
+ if (secureClient != null && !success) {
FileSystem targetFs = FileSystem.get(getConf());
// fs is the source filesystem
if(fs == null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9567602..e03993f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -498,6 +498,8 @@ public class HRegionServer extends HasThread implements
private volatile ThroughputController flushThroughputController;
+ protected final SecureBulkLoadManager secureBulkLoadManager;
+
/**
* Starts a HRegionServer at the default location.
* @param conf
@@ -618,6 +620,9 @@ public class HRegionServer extends HasThread implements
}
this.configurationManager = new ConfigurationManager();
+ this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf);
+ this.secureBulkLoadManager.start();
+
rpcServices.start();
putUpWebUI();
this.walRoller = new LogRoller(this, this);
@@ -3431,4 +3436,9 @@ public class HRegionServer extends HasThread implements
public MetricsRegionServer getMetrics() {
return metricsRegionServer;
}
+
+ @Override
+ public SecureBulkLoadManager getSecureBulkLoadManager() {
+ return this.secureBulkLoadManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d9ea186..9cfc5df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
@@ -147,6 +149,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
@@ -2042,21 +2046,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
Region region = getRegion(request.getRegion());
- List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
- for (FamilyPath familyPath: request.getFamilyPathList()) {
- familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
- familyPath.getPath()));
- }
boolean bypass = false;
- if (region.getCoprocessorHost() != null) {
- bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
- }
boolean loaded = false;
- if (!bypass) {
- loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
- }
- if (region.getCoprocessorHost() != null) {
- loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+
+ if (!request.hasBulkToken()) {
+ // Old style bulk load. This will not be supported in future releases
+ List<Pair<byte[], String>> familyPaths =
+ new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount());
+ for (FamilyPath familyPath : request.getFamilyPathList()) {
+ familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(), familyPath
+ .getPath()));
+ }
+ if (region.getCoprocessorHost() != null) {
+ bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
+ }
+ if (!bypass) {
+ loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
+ }
+ if (region.getCoprocessorHost() != null) {
+ loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+ }
+ } else {
+ // secure bulk load
+ loaded = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
}
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(loaded);
@@ -2067,6 +2079,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
@Override
+ public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
+ PrepareBulkLoadRequest request) throws ServiceException {
+ try {
+ checkOpen();
+ requestCount.increment();
+
+ Region region = getRegion(request.getRegion());
+
+ String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request);
+ PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
+ builder.setBulkToken(bulkToken);
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ @Override
+ public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
+ CleanupBulkLoadRequest request) throws ServiceException {
+ try {
+ checkOpen();
+ requestCount.increment();
+
+ Region region = getRegion(request.getRegion());
+
+ regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request);
+ CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build();
+ return response;
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ @Override
public CoprocessorServiceResponse execService(final RpcController controller,
final CoprocessorServiceRequest request) throws ServiceException {
try {
@@ -2930,4 +2977,5 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
return UpdateConfigurationResponse.getDefaultInstance();
}
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index c6689a9..356a88b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -81,6 +81,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
RegionServerQuotaManager getRegionServerQuotaManager();
/**
+ * @return RegionServer's instance of {@link SecureBulkLoadManager}
+ */
+ SecureBulkLoadManager getSecureBulkLoadManager();
+
+ /**
* Context for postOpenDeployTasks().
*/
class PostOpenDeployContext {
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
new file mode 100644
index 0000000..b47b31d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
+import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.Methods;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.PrivilegedAction;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Bulk loads in secure mode.
+ *
+ * This service addresses two issues:
+ * <ol>
+ * <li>Moving files in a secure filesystem wherein the HBase Client
+ * and HBase Server are different filesystem users.</li>
+ * <li>Does moving in a secure manner. Assuming that the filesystem
+ * is POSIX compliant.</li>
+ * </ol>
+ *
+ * The algorithm is as follows:
+ * <ol>
+ * <li>Create an hbase owned staging directory which is
+ * world traversable (711): {@code /hbase/staging}</li>
+ * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
+ * <li>A call is made to hbase to create a secret staging directory
+ * which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li>
+ * <li>The user moves the data into the random staging directory,
+ * then calls bulkLoadHFiles()</li>
+ * </ol>
+ *
+ * Like delegation tokens the strength of the security lies in the length
+ * and randomness of the secret directory.
+ *
+ */
+@InterfaceAudience.Private
+public class SecureBulkLoadManager {
+
+ public static final long VERSION = 0L;
+
+ //320/5 = 64 characters
+ private static final int RANDOM_WIDTH = 320;
+ private static final int RANDOM_RADIX = 32;
+
+ private static final Log LOG = LogFactory.getLog(SecureBulkLoadManager.class);
+
+ private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
+ private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
+
+ private SecureRandom random;
+ private FileSystem fs;
+ private Configuration conf;
+
+ //two levels so it doesn't get deleted accidentally
+ //no sticky bit in Hadoop 1.0
+ private Path baseStagingDir;
+
+ private UserProvider userProvider;
+
+ SecureBulkLoadManager(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public void start() {
+ random = new SecureRandom();
+ baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
+ this.userProvider = UserProvider.instantiate(conf);
+
+ try {
+ fs = FileSystem.get(conf);
+ fs.mkdirs(baseStagingDir, PERM_HIDDEN);
+ fs.setPermission(baseStagingDir, PERM_HIDDEN);
+ FileStatus status = fs.getFileStatus(baseStagingDir);
+ //no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased
+ fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
+ if (status == null) {
+ throw new IllegalStateException("Failed to create staging directory "
+ + baseStagingDir.toString());
+ }
+ if (!status.getPermission().equals(PERM_HIDDEN)) {
+ throw new IllegalStateException(
+ "Staging directory already exists but permissions aren't set to '-rwx--x--x' "
+ + baseStagingDir.toString());
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to create or set permission on staging directory "
+ + baseStagingDir.toString(), e);
+ throw new IllegalStateException("Failed to create or set permission on staging directory "
+ + baseStagingDir.toString(), e);
+ }
+ }
+
+ public void stop() throws IOException {
+ }
+
+ public String prepareBulkLoad(final Region region, final PrepareBulkLoadRequest request)
+ throws IOException {
+ List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
+
+ if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
+ ObserverContext<RegionCoprocessorEnvironment> ctx =
+ new ObserverContext<RegionCoprocessorEnvironment>();
+ ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
+ .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
+
+ for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
+ bulkLoadObserver.prePrepareBulkLoad(ctx, request);
+ }
+ }
+
+ String bulkToken =
+ createStagingDir(baseStagingDir, getActiveUser(), region.getTableDesc().getTableName())
+ .toString();
+
+ return bulkToken;
+ }
+
+ public void cleanupBulkLoad(final Region region, final CleanupBulkLoadRequest request)
+ throws IOException {
+ List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
+
+ if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
+ ObserverContext<RegionCoprocessorEnvironment> ctx =
+ new ObserverContext<RegionCoprocessorEnvironment>();
+ ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
+ .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
+
+ for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
+ bulkLoadObserver.preCleanupBulkLoad(ctx, request);
+ }
+ }
+
+ fs.delete(new Path(request.getBulkToken()), true);
+ }
+
+ public boolean secureBulkLoadHFiles(final Region region,
+ final BulkLoadHFileRequest request) throws IOException {
+ final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount());
+ for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
+ familyPaths.add(new Pair<byte[], String>(el.getFamily().toByteArray(), el.getPath()));
+ }
+
+ Token userToken = null;
+ if (userProvider.isHadoopSecurityEnabled()) {
+ userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
+ .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
+ request.getFsToken().getService()));
+ }
+ final String bulkToken = request.getBulkToken();
+ User user = getActiveUser();
+ final UserGroupInformation ugi = user.getUGI();
+ if(userToken != null) {
+ ugi.addToken(userToken);
+ } else if (userProvider.isHadoopSecurityEnabled()) {
+ //we allow this to pass through in "simple" security mode
+ //for mini cluster testing
+ throw new DoNotRetryIOException("User token cannot be null");
+ }
+
+ boolean bypass = false;
+ if (region.getCoprocessorHost() != null) {
+ bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
+ }
+ boolean loaded = false;
+ if (!bypass) {
+ // Get the target fs (HBase region server fs) delegation token
+ // Since we have checked the permission via 'preBulkLoadHFile', now let's give
+ // the 'request user' necessary token to operate on the target fs.
+ // After this point the 'doAs' user will hold two tokens, one for the source fs
+ // ('request user'), another for the target fs (HBase region server principal).
+ if (userProvider.isHadoopSecurityEnabled()) {
+ FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+ targetfsDelegationToken.acquireDelegationToken(fs);
+
+ Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
+ if (targetFsToken != null
+ && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
+ ugi.addToken(targetFsToken);
+ }
+ }
+
+ loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
+ @Override
+ public Boolean run() {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(conf);
+ for(Pair<byte[], String> el: familyPaths) {
+ Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
+ if(!fs.exists(stageFamily)) {
+ fs.mkdirs(stageFamily);
+ fs.setPermission(stageFamily, PERM_ALL_ACCESS);
+ }
+ }
+ //We call bulkLoadHFiles as requesting user
+ //To enable access prior to staging
+ return region.bulkLoadHFiles(familyPaths, true,
+ new SecureBulkLoadListener(fs, bulkToken, conf));
+ } catch (Exception e) {
+ LOG.error("Failed to complete bulk load", e);
+ } finally {
+ if (fs != null) {
+ try {
+ if (!UserGroupInformation.getLoginUser().equals(ugi)) {
+ FileSystem.closeAllForUGI(ugi);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e);
+ }
+ }
+ }
+ return false;
+ }
+ });
+ }
+ if (region.getCoprocessorHost() != null) {
+ loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+ }
+ return loaded;
+ }
+
+ private List<BulkLoadObserver> getBulkLoadObservers(Region region) {
+ List<BulkLoadObserver> coprocessorList =
+ region.getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
+
+ return coprocessorList;
+ }
+
+ private Path createStagingDir(Path baseDir,
+ User user,
+ TableName tableName) throws IOException {
+ String tblName = tableName.getNameAsString().replace(":", "_");
+ String randomDir = user.getShortName()+"__"+ tblName +"__"+
+ (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
+ return createStagingDir(baseDir, user, randomDir);
+ }
+
+ private Path createStagingDir(Path baseDir,
+ User user,
+ String randomDir) throws IOException {
+ Path p = new Path(baseDir, randomDir);
+ fs.mkdirs(p, PERM_ALL_ACCESS);
+ fs.setPermission(p, PERM_ALL_ACCESS);
+ return p;
+ }
+
+ private User getActiveUser() throws IOException {
+ User user = RpcServer.getRequestUser();
+ if (user == null) {
+ // for non-rpc handling, fallback to system user
+ user = userProvider.getCurrent();
+ }
+
+ //this is for testing
+ if (userProvider.isHadoopSecurityEnabled()
+ && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
+ return User.createUserForTesting(conf, user.getShortName(), new String[]{});
+ }
+
+ return user;
+ }
+
+ private static class SecureBulkLoadListener implements BulkLoadListener {
+ // Target filesystem
+ private final FileSystem fs;
+ private final String stagingDir;
+ private final Configuration conf;
+ // Source filesystem
+ private FileSystem srcFs = null;
+ private Map<String, FsPermission> origPermissions = null;
+
+ public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
+ this.fs = fs;
+ this.stagingDir = stagingDir;
+ this.conf = conf;
+ this.origPermissions = new HashMap<String, FsPermission>();
+ }
+
+ @Override
+ public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
+ Path p = new Path(srcPath);
+ Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
+
+ // In case of Replication for bulk load files, hfiles are already copied in staging directory
+ if (p.equals(stageP)) {
+ LOG.debug(p.getName()
+ + " is already available in staging directory. Skipping copy or rename.");
+ return stageP.toString();
+ }
+
+ if (srcFs == null) {
+ srcFs = FileSystem.get(p.toUri(), conf);
+ }
+
+ if(!isFile(p)) {
+ throw new IOException("Path does not reference a file: " + p);
+ }
+
+ // Check to see if the source and target filesystems are the same
+ if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
+ LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
+ "the destination filesystem. Copying file over to destination staging dir.");
+ FileUtil.copy(srcFs, p, fs, stageP, false, conf);
+ } else {
+ LOG.debug("Moving " + p + " to " + stageP);
+ FileStatus origFileStatus = fs.getFileStatus(p);
+ origPermissions.put(srcPath, origFileStatus.getPermission());
+ if(!fs.rename(p, stageP)) {
+ throw new IOException("Failed to move HFile: " + p + " to " + stageP);
+ }
+ }
+ fs.setPermission(stageP, PERM_ALL_ACCESS);
+ return stageP.toString();
+ }
+
+ @Override
+ public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
+ LOG.debug("Bulk Load done for: " + srcPath);
+ }
+
+ @Override
+ public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
+ if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
+ // files are copied so no need to move them back
+ return;
+ }
+ Path p = new Path(srcPath);
+ Path stageP = new Path(stagingDir,
+ new Path(Bytes.toString(family), p.getName()));
+
+ // In case of Replication for bulk load files, hfiles are not renamed by end point during
+ // prepare stage, so no need of rename here again
+ if (p.equals(stageP)) {
+ LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
+ return;
+ }
+
+ LOG.debug("Moving " + stageP + " back to " + p);
+ if(!fs.rename(stageP, p))
+ throw new IOException("Failed to move HFile: " + stageP + " to " + p);
+
+ // restore original permission
+ if (origPermissions.containsKey(srcPath)) {
+ fs.setPermission(p, origPermissions.get(srcPath));
+ } else {
+ LOG.warn("Can't find previous permission for path=" + srcPath);
+ }
+ }
+
+ /**
+ * Check if the path is referencing a file.
+ * This is mainly needed to avoid symlinks.
+ * @param p
+ * @return true if the p is a file
+ * @throws IOException
+ */
+ private boolean isFile(Path p) throws IOException {
+ FileStatus status = srcFs.getFileStatus(p);
+ boolean isFile = !status.isDirectory();
+ try {
+ isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
+ } catch (Exception e) {
+ }
+ return isFile;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index f21d8e2..7d5fc32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -95,10 +95,10 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -2145,7 +2145,7 @@ public class AccessController extends BaseMasterAndRegionObserver
*/
@Override
public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
- PrepareBulkLoadRequest request) throws IOException {
+ PrepareBulkLoadRequest request) throws IOException {
requireAccess("prePareBulkLoad",
ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
}
@@ -2159,7 +2159,7 @@ public class AccessController extends BaseMasterAndRegionObserver
*/
@Override
public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
- CleanupBulkLoadRequest request) throws IOException {
+ CleanupBulkLoadRequest request) throws IOException {
requireAccess("preCleanupBulkLoad",
ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index c1f9251..cb143b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -18,140 +18,52 @@
package org.apache.hadoop.hbase.security.access;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
+import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadResponse;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
-import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.FsDelegationToken;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSHDFSUtils;
-import org.apache.hadoop.hbase.util.Methods;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.security.PrivilegedAction;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
/**
* Coprocessor service for bulk loads in secure mode.
- * This coprocessor has to be installed as part of enabling
- * security in HBase.
- *
- * This service addresses two issues:
- * <ol>
- * <li>Moving files in a secure filesystem wherein the HBase Client
- * and HBase Server are different filesystem users.</li>
- * <li>Does moving in a secure manner. Assuming that the filesystem
- * is POSIX compliant.</li>
- * </ol>
- *
- * The algorithm is as follows:
- * <ol>
- * <li>Create an hbase owned staging directory which is
- * world traversable (711): {@code /hbase/staging}</li>
- * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
- * <li>A call is made to hbase to create a secret staging directory
- * which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li>
- * <li>The user moves the data into the random staging directory,
- * then calls bulkLoadHFiles()</li>
- * </ol>
- * Like delegation tokens the strength of the security lies in the length
- * and randomness of the secret directory.
- *
+ * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
*/
@InterfaceAudience.Private
+@Deprecated
public class SecureBulkLoadEndpoint extends SecureBulkLoadService
implements CoprocessorService, Coprocessor {
public static final long VERSION = 0L;
- //320/5 = 64 characters
- private static final int RANDOM_WIDTH = 320;
- private static final int RANDOM_RADIX = 32;
-
private static final Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class);
- private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
- private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
-
- private SecureRandom random;
- private FileSystem fs;
- private Configuration conf;
-
- //two levels so it doesn't get deleted accidentally
- //no sticky bit in Hadoop 1.0
- private Path baseStagingDir;
-
private RegionCoprocessorEnvironment env;
- private UserProvider userProvider;
-
@Override
public void start(CoprocessorEnvironment env) {
this.env = (RegionCoprocessorEnvironment)env;
- random = new SecureRandom();
- conf = env.getConfiguration();
- baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
- this.userProvider = UserProvider.instantiate(conf);
-
- try {
- fs = FileSystem.get(conf);
- fs.mkdirs(baseStagingDir, PERM_HIDDEN);
- fs.setPermission(baseStagingDir, PERM_HIDDEN);
- //no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased
- fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
- FileStatus status = fs.getFileStatus(baseStagingDir);
- if(status == null) {
- throw new IllegalStateException("Failed to create staging directory");
- }
- if(!status.getPermission().equals(PERM_HIDDEN)) {
- throw new IllegalStateException(
- "Directory already exists but permissions aren't set to '-rwx--x--x' ");
- }
- } catch (IOException e) {
- throw new IllegalStateException("Failed to get FileSystem instance",e);
- }
+ LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+ LOG.warn("Secure bulk load has been integrated into HBase core.");
}
@Override
@@ -159,24 +71,12 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
@Override
- public void prepareBulkLoad(RpcController controller,
- PrepareBulkLoadRequest request,
- RpcCallback<PrepareBulkLoadResponse> done){
+ public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
+ RpcCallback<PrepareBulkLoadResponse> done) {
try {
- List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
-
- if(bulkLoadObservers != null) {
- ObserverContext<RegionCoprocessorEnvironment> ctx =
- new ObserverContext<RegionCoprocessorEnvironment>();
- ctx.prepare(env);
-
- for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
- bulkLoadObserver.prePrepareBulkLoad(ctx, request);
- }
- }
-
- String bulkToken = createStagingDir(baseStagingDir,
- getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
+ SecureBulkLoadManager secureBulkLoadManager =
+ this.env.getRegionServerServices().getSecureBulkLoadManager();
+ String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(), request);
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
@@ -185,23 +85,12 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
@Override
- public void cleanupBulkLoad(RpcController controller,
- CleanupBulkLoadRequest request,
- RpcCallback<CleanupBulkLoadResponse> done) {
+ public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
+ RpcCallback<CleanupBulkLoadResponse> done) {
try {
- List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
-
- if(bulkLoadObservers != null) {
- ObserverContext<RegionCoprocessorEnvironment> ctx =
- new ObserverContext<RegionCoprocessorEnvironment>();
- ctx.prepare(env);
-
- for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
- bulkLoadObserver.preCleanupBulkLoad(ctx, request);
- }
- }
-
- fs.delete(new Path(request.getBulkToken()), true);
+ SecureBulkLoadManager secureBulkLoadManager =
+ this.env.getRegionServerServices().getSecureBulkLoadManager();
+ secureBulkLoadManager.cleanupBulkLoad(this.env.getRegion(), request);
done.run(CleanupBulkLoadResponse.newBuilder().build());
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
@@ -210,262 +99,35 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
@Override
- public void secureBulkLoadHFiles(RpcController controller,
- SecureBulkLoadHFilesRequest request,
- RpcCallback<SecureBulkLoadHFilesResponse> done) {
- final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
- for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
- familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
- }
-
- Token userToken = null;
- if (userProvider.isHadoopSecurityEnabled()) {
- userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
- .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
- request.getFsToken().getService()));
- }
- final String bulkToken = request.getBulkToken();
- User user = getActiveUser();
- final UserGroupInformation ugi = user.getUGI();
- if(userToken != null) {
- ugi.addToken(userToken);
- } else if (userProvider.isHadoopSecurityEnabled()) {
- //we allow this to pass through in "simple" security mode
- //for mini cluster testing
- ResponseConverter.setControllerException(controller,
- new DoNotRetryIOException("User token cannot be null"));
- done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
- return;
- }
-
- Region region = env.getRegion();
- boolean bypass = false;
- if (region.getCoprocessorHost() != null) {
- try {
- bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
- } catch (IOException e) {
- ResponseConverter.setControllerException(controller, e);
- done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
- return;
- }
- }
+ public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
+ RpcCallback<SecureBulkLoadHFilesResponse> done) {
boolean loaded = false;
- if (!bypass) {
- // Get the target fs (HBase region server fs) delegation token
- // Since we have checked the permission via 'preBulkLoadHFile', now let's give
- // the 'request user' necessary token to operate on the target fs.
- // After this point the 'doAs' user will hold two tokens, one for the source fs
- // ('request user'), another for the target fs (HBase region server principal).
- if (userProvider.isHadoopSecurityEnabled()) {
- FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
- try {
- targetfsDelegationToken.acquireDelegationToken(fs);
- } catch (IOException e) {
- ResponseConverter.setControllerException(controller, e);
- done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
- return;
- }
- Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
- if (targetFsToken != null
- && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
- ugi.addToken(targetFsToken);
- }
- }
-
- loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
- @Override
- public Boolean run() {
- FileSystem fs = null;
- try {
- Configuration conf = env.getConfiguration();
- fs = FileSystem.get(conf);
- for(Pair<byte[], String> el: familyPaths) {
- Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
- if(!fs.exists(stageFamily)) {
- fs.mkdirs(stageFamily);
- fs.setPermission(stageFamily, PERM_ALL_ACCESS);
- }
- }
- //We call bulkLoadHFiles as requesting user
- //To enable access prior to staging
- return env.getRegion().bulkLoadHFiles(familyPaths, true,
- new SecureBulkLoadListener(fs, bulkToken, conf));
- } catch (Exception e) {
- LOG.error("Failed to complete bulk load", e);
- } finally {
- if (fs != null) {
- try {
- if (!UserGroupInformation.getLoginUser().equals(ugi)) {
- FileSystem.closeAllForUGI(ugi);
- }
- } catch (IOException e) {
- LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e);
- }
- }
- }
- return false;
- }
- });
- }
- if (region.getCoprocessorHost() != null) {
- try {
- loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
- } catch (IOException e) {
- ResponseConverter.setControllerException(controller, e);
- done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
- return;
- }
+ try {
+ SecureBulkLoadManager secureBulkLoadManager =
+ this.env.getRegionServerServices().getSecureBulkLoadManager();
+ BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request);
+ loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(), bulkLoadHFileRequest);
+ } catch (IOException e) {
+ ResponseConverter.setControllerException(controller, e);
}
done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
}
- private List<BulkLoadObserver> getBulkLoadObservers() {
- List<BulkLoadObserver> coprocessorList =
- this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
-
- return coprocessorList;
- }
-
- private Path createStagingDir(Path baseDir,
- User user,
- TableName tableName) throws IOException {
- String tblName = tableName.getNameAsString().replace(":", "_");
- String randomDir = user.getShortName()+"__"+ tblName +"__"+
- (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
- return createStagingDir(baseDir, user, randomDir);
- }
-
- private Path createStagingDir(Path baseDir,
- User user,
- String randomDir) throws IOException {
- Path p = new Path(baseDir, randomDir);
- fs.mkdirs(p, PERM_ALL_ACCESS);
- fs.setPermission(p, PERM_ALL_ACCESS);
- return p;
- }
-
- private User getActiveUser() {
- User user = RpcServer.getRequestUser();
- if (user == null) {
- return null;
- }
-
- //this is for testing
- if (userProvider.isHadoopSecurityEnabled()
- && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
- return User.createUserForTesting(conf, user.getShortName(), new String[]{});
- }
-
- return user;
+ private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
+ SecureBulkLoadHFilesRequest request) {
+ BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
+ RegionSpecifier region =
+ RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env
+ .getRegionInfo().getRegionName());
+
+ bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken())
+ .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum())
+ .addAllFamilyPath(request.getFamilyPathList());
+ return bulkLoadHFileRequest.build();
}
@Override
public Service getService() {
return this;
}
-
- private static class SecureBulkLoadListener implements BulkLoadListener {
- // Target filesystem
- private FileSystem fs;
- private String stagingDir;
- private Configuration conf;
- // Source filesystem
- private FileSystem srcFs = null;
- private Map<String, FsPermission> origPermissions = null;
-
- public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
- this.fs = fs;
- this.stagingDir = stagingDir;
- this.conf = conf;
- this.origPermissions = new HashMap<String, FsPermission>();
- }
-
- @Override
- public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
- Path p = new Path(srcPath);
- Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
-
- // In case of Replication for bulk load files, hfiles are already copied in staging directory
- if (p.equals(stageP)) {
- LOG.debug(p.getName()
- + " is already available in staging directory. Skipping copy or rename.");
- return stageP.toString();
- }
-
- if (srcFs == null) {
- srcFs = FileSystem.get(p.toUri(), conf);
- }
-
- if(!isFile(p)) {
- throw new IOException("Path does not reference a file: " + p);
- }
-
- // Check to see if the source and target filesystems are the same
- if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
- LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
- "the destination filesystem. Copying file over to destination staging dir.");
- FileUtil.copy(srcFs, p, fs, stageP, false, conf);
- } else {
- LOG.debug("Moving " + p + " to " + stageP);
- FileStatus origFileStatus = fs.getFileStatus(p);
- origPermissions.put(srcPath, origFileStatus.getPermission());
- if(!fs.rename(p, stageP)) {
- throw new IOException("Failed to move HFile: " + p + " to " + stageP);
- }
- }
- fs.setPermission(stageP, PERM_ALL_ACCESS);
- return stageP.toString();
- }
-
- @Override
- public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
- LOG.debug("Bulk Load done for: " + srcPath);
- }
-
- @Override
- public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
- if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
- // files are copied so no need to move them back
- return;
- }
- Path p = new Path(srcPath);
- Path stageP = new Path(stagingDir,
- new Path(Bytes.toString(family), p.getName()));
-
- // In case of Replication for bulk load files, hfiles are not renamed by end point during
- // prepare stage, so no need of rename here again
- if (p.equals(stageP)) {
- LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
- return;
- }
-
- LOG.debug("Moving " + stageP + " back to " + p);
- if(!fs.rename(stageP, p))
- throw new IOException("Failed to move HFile: " + stageP + " to " + p);
-
- // restore original permission
- if (origPermissions.containsKey(srcPath)) {
- fs.setPermission(p, origPermissions.get(srcPath));
- } else {
- LOG.warn("Can't find previous permission for path=" + srcPath);
- }
- }
-
- /**
- * Check if the path is referencing a file.
- * This is mainly needed to avoid symlinks.
- * @param p
- * @return true if the p is a file
- * @throws IOException
- */
- private boolean isFile(Path p) throws IOException {
- FileStatus status = srcFs.getFileStatus(p);
- boolean isFile = !status.isDirectory();
- try {
- isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
- } catch (Exception e) {
- }
- return isFile;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index bfa14cb..a6dc59f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1080,6 +1080,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
createRootDir(create);
+ // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
+ // for tests that do not read hbase-defaults.xml
+ setHBaseFsTmpDir();
+
// These settings will make the server waits until this exact number of
// regions servers are connected.
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
@@ -1104,10 +1108,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
getAdmin(); // create immediately the hbaseAdmin
LOG.info("Minicluster is up");
- // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
- // for tests that do not read hbase-defaults.xml
- setHBaseFsTmpDir();
-
return (MiniHBaseCluster)this.hbaseCluster;
}
@@ -1278,6 +1278,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
} else {
LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
}
+ this.conf.set("hbase.bulkload.staging.dir", this.conf.get("hbase.fs.tmp.dir"));
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 6cd1963..6f225d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@@ -316,7 +317,6 @@ public class MockRegionServerServices implements RegionServerServices {
@Override
public ClusterConnection getClusterConnection() {
- // TODO Auto-generated method stub
return null;
}
@@ -334,4 +334,9 @@ public class MockRegionServerServices implements RegionServerServices {
public MetricsRegionServer getMetrics() {
return null;
}
+
+ @Override
+ public SecureBulkLoadManager getSecureBulkLoadManager() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 48d7efc..354f0a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -336,16 +334,21 @@ public class TestReplicaWithCluster {
LOG.debug("Loading test data");
@SuppressWarnings("deprecation")
final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
+ table = conn.getTable(hdt.getTableName());
+ final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
@Override
public Void call(int timeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
+ SecureBulkLoadClient secureClient = null;
byte[] regionName = getLocation().getRegionInfo().getRegionName();
- BulkLoadHFileRequest request =
- RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
- getStub().bulkLoadHFile(null, request);
+ try (Table table = conn.getTable(getTableName())) {
+ secureClient = new SecureBulkLoadClient(table);
+ secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+ true, null, bulkToken);
+ }
return null;
}
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java
deleted file mode 100644
index 11627a1..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.junit.BeforeClass;
-import org.junit.experimental.categories.Category;
-
-@Category(LargeTests.class)
-public class TestLoadIncrementalHFilesUseSecurityEndPoint extends TestLoadIncrementalHFiles {
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
- MAX_FILES_PER_REGION_PER_FAMILY);
- util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- // change default behavior so that tag values are returned with normal rpcs
- util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
- KeyValueCodecWithTags.class.getCanonicalName());
-
- util.startMiniCluster();
- setupNamespace();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 69f2e35..2927023 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodes
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@@ -90,6 +92,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
@@ -103,6 +107,7 @@ import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@@ -661,7 +666,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
@Override
public ClusterConnection getClusterConnection() {
- // TODO Auto-generated method stub
return null;
}
@@ -679,4 +683,21 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
public MetricsRegionServer getMetrics() {
return null;
}
-}
\ No newline at end of file
+
+ @Override
+ public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
+ PrepareBulkLoadRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
+ CleanupBulkLoadRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public SecureBulkLoadManager getSecureBulkLoadManager() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
new file mode 100644
index 0000000..9ecc5d6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
+import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
+ * @deprecated Use for backward compatibility testing only. Will be removed when
+ * SecureBulkLoadEndpoint is not supported.
+ */
+@InterfaceAudience.Private
+public class SecureBulkLoadEndpointClient {
+ private Table table;
+
+ public SecureBulkLoadEndpointClient(Table table) {
+ this.table = table;
+ }
+
+ public String prepareBulkLoad(final TableName tableName) throws IOException {
+ try {
+ CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
+ SecureBulkLoadProtos.SecureBulkLoadService instance =
+ ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+ ServerRpcController controller = new ServerRpcController();
+
+ BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback =
+ new BlockingRpcCallback<PrepareBulkLoadResponse>();
+
+ PrepareBulkLoadRequest request =
+ PrepareBulkLoadRequest.newBuilder()
+ .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
+
+ instance.prepareBulkLoad(controller,
+ request,
+ rpcCallback);
+
+ PrepareBulkLoadResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+
+ return response.getBulkToken();
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
+ }
+ }
+
+ public void cleanupBulkLoad(final String bulkToken) throws IOException {
+ try {
+ CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
+ SecureBulkLoadProtos.SecureBulkLoadService instance =
+ ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+ ServerRpcController controller = new ServerRpcController();
+
+ BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback =
+ new BlockingRpcCallback<CleanupBulkLoadResponse>();
+
+ CleanupBulkLoadRequest request =
+ CleanupBulkLoadRequest.newBuilder()
+ .setBulkToken(bulkToken).build();
+
+ instance.cleanupBulkLoad(controller,
+ request,
+ rpcCallback);
+
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
+ }
+ }
+
+ public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
+ final Token<?> userToken,
+ final String bulkToken,
+ final byte[] startRow) throws IOException {
+ // we never want to send a batch of HFiles to all regions, thus cannot call
+ // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
+ try {
+ CoprocessorRpcChannel channel = table.coprocessorService(startRow);
+ SecureBulkLoadProtos.SecureBulkLoadService instance =
+ ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+ DelegationToken protoDT =
+ DelegationToken.newBuilder().build();
+ if(userToken != null) {
+ protoDT =
+ DelegationToken.newBuilder()
+ .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
+ .setPassword(ByteStringer.wrap(userToken.getPassword()))
+ .setKind(userToken.getKind().toString())
+ .setService(userToken.getService().toString()).build();
+ }
+
+ List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
+ new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
+ for(Pair<byte[], String> el: familyPaths) {
+ protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
+ .setFamily(ByteStringer.wrap(el.getFirst()))
+ .setPath(el.getSecond()).build());
+ }
+
+ SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
+ SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
+ .setFsToken(protoDT)
+ .addAllFamilyPath(protoFamilyPaths)
+ .setBulkToken(bulkToken).build();
+
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
+ new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
+ instance.secureBulkLoadHFiles(controller,
+ request,
+ rpcCallback);
+
+ SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ return response.getLoaded();
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
+ }
+ }
+
+ public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
+ return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index bd5c91e..6e68201 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -91,15 +90,15 @@ import com.google.common.collect.Lists;
@Category({RegionServerTests.class, LargeTests.class})
public class TestHRegionServerBulkLoad {
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
- private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private final static Configuration conf = UTIL.getConfiguration();
- private final static byte[] QUAL = Bytes.toBytes("qual");
- private final static int NUM_CFS = 10;
+ protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ protected final static Configuration conf = UTIL.getConfiguration();
+ protected final static byte[] QUAL = Bytes.toBytes("qual");
+ protected final static int NUM_CFS = 10;
private int sleepDuration;
public static int BLOCKSIZE = 64 * 1024;
public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
- private final static byte[][] families = new byte[NUM_CFS][];
+ protected final static byte[][] families = new byte[NUM_CFS][];
static {
for (int i = 0; i < NUM_CFS; i++) {
families[i] = Bytes.toBytes(family(i));
@@ -200,16 +199,21 @@ public class TestHRegionServerBulkLoad {
// bulk load HFiles
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
+ Table table = conn.getTable(tableName);
+ final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
public Void call(int callTimeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
+ SecureBulkLoadClient secureClient = null;
byte[] regionName = getLocation().getRegionInfo().getRegionName();
- BulkLoadHFileRequest request =
- RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
- getStub().bulkLoadHFile(null, request);
+ try (Table table = conn.getTable(getTableName())) {
+ secureClient = new SecureBulkLoadClient(table);
+ secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+ true, null, bulkToken);
+ }
return null;
}
};
@@ -320,7 +324,7 @@ public class TestHRegionServerBulkLoad {
* Creates a table with given table name and specified number of column
* families if the table does not already exist.
*/
- private void setupTable(TableName table, int cfs) throws IOException {
+ public void setupTable(TableName table, int cfs) throws IOException {
try {
LOG.info("Creating table " + table);
HTableDescriptor htd = new HTableDescriptor(table);