You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/12/11 03:34:38 UTC
[15/16] hbase git commit: HBASE-21516 Use AsyncConnection instead of
Connection in SecureBulkLoadManager
HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/47810209
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/47810209
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/47810209
Branch: refs/heads/HBASE-21512
Commit: 47810209598db31c4ded5986d892eda774f2951f
Parents: 054b4ba
Author: zhangduo <zh...@apache.org>
Authored: Sat Dec 1 21:15:48 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Tue Dec 11 11:17:11 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/protobuf/ProtobufUtil.java | 5 +-
.../hbase/shaded/protobuf/ProtobufUtil.java | 7 ++-
.../hbase/regionserver/HRegionServer.java | 2 +-
.../regionserver/SecureBulkLoadManager.java | 24 +++++----
.../hadoop/hbase/security/token/TokenUtil.java | 57 +++++++++++++++-----
.../hbase/security/token/TestTokenUtil.java | 42 +++++++++++----
6 files changed, 96 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/47810209/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 4d54528..a2c7687 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -261,13 +261,12 @@ public final class ProtobufUtil {
* just {@link ServiceException}. Prefer this method to
* {@link #getRemoteException(ServiceException)} because trying to
* contain direct protobuf references.
- * @param e
*/
- public static IOException handleRemoteException(Exception e) {
+ public static IOException handleRemoteException(Throwable e) {
return makeIOExceptionOfException(e);
}
- private static IOException makeIOExceptionOfException(Exception e) {
+ private static IOException makeIOExceptionOfException(Throwable e) {
Throwable t = e;
if (e instanceof ServiceException ||
e instanceof org.apache.hbase.thirdparty.com.google.protobuf.ServiceException) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/47810209/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index cf4c831..d604013 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
@@ -123,6 +122,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -343,13 +343,12 @@ public final class ProtobufUtil {
* just {@link ServiceException}. Prefer this method to
* {@link #getRemoteException(ServiceException)} because trying to
* contain direct protobuf references.
- * @param e
*/
- public static IOException handleRemoteException(Exception e) {
+ public static IOException handleRemoteException(Throwable e) {
return makeIOExceptionOfException(e);
}
- private static IOException makeIOExceptionOfException(Exception e) {
+ private static IOException makeIOExceptionOfException(Throwable e) {
Throwable t = e;
if (e instanceof ServiceException) {
t = e.getCause();
http://git-wip-us.apache.org/repos/asf/hbase/blob/47810209/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3085469..0d0a34e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1927,7 +1927,7 @@ public class HRegionServer extends HasThread implements
if (!isStopped() && !isAborted()) {
initializeThreads();
}
- this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
+ this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
this.secureBulkLoadManager.start();
// Health checker thread.
http://git-wip-us.apache.org/repos/asf/hbase/blob/47810209/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 566a6b6..add6519 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -38,11 +37,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -56,7 +56,9 @@ import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
@@ -111,9 +113,9 @@ public class SecureBulkLoadManager {
private UserProvider userProvider;
private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter;
- private Connection conn;
+ private AsyncConnection conn;
- SecureBulkLoadManager(Configuration conf, Connection conn) {
+ SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
this.conf = conf;
this.conn = conn;
}
@@ -212,23 +214,23 @@ public class SecureBulkLoadManager {
familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
}
- Token userToken = null;
+ Token<AuthenticationTokenIdentifier> userToken = null;
if (userProvider.isHadoopSecurityEnabled()) {
- userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
- .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
- request.getFsToken().getService()));
+ userToken = new Token<>(request.getFsToken().getIdentifier().toByteArray(),
+ request.getFsToken().getPassword().toByteArray(), new Text(request.getFsToken().getKind()),
+ new Text(request.getFsToken().getService()));
}
final String bulkToken = request.getBulkToken();
User user = getActiveUser();
final UserGroupInformation ugi = user.getUGI();
if (userProvider.isHadoopSecurityEnabled()) {
try {
- Token tok = TokenUtil.obtainToken(conn);
+ Token<AuthenticationTokenIdentifier> tok = TokenUtil.obtainToken(conn).get();
if (tok != null) {
boolean b = ugi.addToken(tok);
LOG.debug("token added " + tok + " for user " + ugi + " return=" + b);
}
- } catch (IOException ioe) {
+ } catch (Exception ioe) {
LOG.warn("unable to add token", ioe);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/47810209/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
index c54d905..28efb84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,27 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.security.token;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.AuthenticationService;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenResponse;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -45,6 +47,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+
/**
* Utility methods for obtaining authentication tokens.
*/
@@ -64,12 +68,39 @@ public class TokenUtil {
/**
* Obtain and return an authentication token for the current user.
+ * @param conn The async HBase cluster connection
+ * @return the authentication token instance, wrapped by a {@link CompletableFuture}.
+ */
+ public static CompletableFuture<Token<AuthenticationTokenIdentifier>> obtainToken(
+ AsyncConnection conn) {
+ CompletableFuture<Token<AuthenticationTokenIdentifier>> future = new CompletableFuture<>();
+ if (injectedException != null) {
+ future.completeExceptionally(injectedException);
+ return future;
+ }
+ AsyncTable<?> table = conn.getTable(TableName.META_TABLE_NAME);
+ table.<AuthenticationService.Interface, GetAuthenticationTokenResponse> coprocessorService(
+ AuthenticationProtos.AuthenticationService::newStub,
+ (s, c, r) -> s.getAuthenticationToken(c,
+ AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance(), r),
+ HConstants.EMPTY_START_ROW).whenComplete((resp, error) -> {
+ if (error != null) {
+ future.completeExceptionally(ProtobufUtil.handleRemoteException(error));
+ } else {
+ future.complete(toToken(resp.getToken()));
+ }
+ });
+ return future;
+ }
+
+ /**
+ * Obtain and return an authentication token for the current user.
* @param conn The HBase cluster connection
* @throws IOException if a remote error or serialization problem occurs.
* @return the authentication token instance
*/
- public static Token<AuthenticationTokenIdentifier> obtainToken(
- Connection conn) throws IOException {
+ public static Token<AuthenticationTokenIdentifier> obtainToken(Connection conn)
+ throws IOException {
Table meta = null;
try {
injectFault();
@@ -77,9 +108,9 @@ public class TokenUtil {
meta = conn.getTable(TableName.META_TABLE_NAME);
CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
AuthenticationProtos.AuthenticationService.BlockingInterface service =
- AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
- AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
- AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
+ AuthenticationService.newBlockingStub(rpcChannel);
+ GetAuthenticationTokenResponse response =
+ service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance());
return toToken(response.getToken());
} catch (ServiceException se) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/47810209/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
index 32fcddb..585a3ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
@@ -18,35 +18,53 @@
package org.apache.hadoop.hbase.security.token;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.net.URLClassLoader;
-
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@Category(SmallTests.class)
public class TestTokenUtil {
+
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestTokenUtil.class);
+ HBaseClassTestRule.forClass(TestTokenUtil.class);
- @Test
- public void testObtainToken() throws Exception {
+ private URLClassLoader cl;
+
+ @Before
+ public void setUp() {
URL urlPU = ProtobufUtil.class.getProtectionDomain().getCodeSource().getLocation();
URL urlTU = TokenUtil.class.getProtectionDomain().getCodeSource().getLocation();
+ cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
+ }
- ClassLoader cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(cl, true);
+ }
+ @Test
+ public void testObtainToken() throws Exception {
Throwable injected = new com.google.protobuf.ServiceException("injected");
Class<?> tokenUtil = cl.loadClass(TokenUtil.class.getCanonicalName());
@@ -55,8 +73,7 @@ public class TestTokenUtil {
shouldInjectFault.set(null, injected);
try {
- tokenUtil.getMethod("obtainToken", Connection.class)
- .invoke(null, new Object[] { null });
+ tokenUtil.getMethod("obtainToken", Connection.class).invoke(null, new Object[] { null });
fail("Should have injected exception.");
} catch (InvocationTargetException e) {
Throwable t = e;
@@ -72,9 +89,16 @@ public class TestTokenUtil {
}
}
+ CompletableFuture<?> future = (CompletableFuture<?>) tokenUtil
+ .getMethod("obtainToken", AsyncConnection.class).invoke(null, new Object[] { null });
+ try {
+ future.get();
+ fail("Should have injected exception.");
+ } catch (ExecutionException e) {
+ assertSame(injected, e.getCause());
+ }
Boolean loaded = (Boolean) cl.loadClass(ProtobufUtil.class.getCanonicalName())
- .getDeclaredMethod("isClassLoaderLoaded")
- .invoke(null);
+ .getDeclaredMethod("isClassLoaderLoaded").invoke(null);
assertFalse("Should not have loaded DynamicClassLoader", loaded);
}
}