You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/06/07 16:22:14 UTC
hbase git commit: HBASE-15913 Sasl encryption doesn't work with
AsyncRpcChannel
Repository: hbase
Updated Branches:
refs/heads/branch-1 fad99a3f6 -> 3ff082cb8
HBASE-15913 Sasl encryption doesn't work with AsyncRpcChannel
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3ff082cb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3ff082cb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3ff082cb
Branch: refs/heads/branch-1
Commit: 3ff082cb877a6401463eb3cbcd05e8736b74ee22
Parents: fad99a3
Author: stack <st...@apache.org>
Authored: Tue Jun 7 09:21:58 2016 -0700
Committer: stack <st...@apache.org>
Committed: Tue Jun 7 09:22:09 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/ipc/AsyncRpcChannel.java | 61 +++-
.../hbase/security/SaslClientHandler.java | 22 +-
.../hbase/security/AbstractTestSecureIPC.java | 305 +++++++++++++++++++
.../hbase/security/TestAsyncSecureIPC.java | 33 ++
.../hadoop/hbase/security/TestSecureIPC.java | 33 ++
.../hadoop/hbase/security/TestSecureRPC.java | 291 ------------------
6 files changed, 441 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index e24050b..878d8b8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -241,6 +241,30 @@ public class AsyncRpcChannel {
}
/**
+ * Start HBase connection with sasl encryption
+ * @param ch channel to start connection on
+ */
+ private void startConnectionWithEncryption(Channel ch) {
+ // for rpc encryption, the order of ChannelInboundHandler should be:
+ // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
+ // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
+ // SaslClientHandler will handler this
+ ch.pipeline().addFirst("beforeUnwrapDecoder",
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
+ ch.pipeline().addLast("afterUnwrapDecoder",
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+ ch.pipeline().addLast(new AsyncServerResponseHandler(this));
+ List<AsyncCall> callsToWrite;
+ synchronized (pendingCalls) {
+ connected = true;
+ callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+ }
+ for (AsyncCall call : callsToWrite) {
+ writeRequest(call);
+ }
+ }
+
+ /**
* Get SASL handler
* @param bootstrap to reconnect to
* @return new SASL handler
@@ -252,6 +276,7 @@ public class AsyncRpcChannel {
client.fallbackAllowed,
client.conf.get("hbase.rpc.protection",
SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
+ getChannelHeaderBytes(authMethod),
new SaslClientHandler.SaslExceptionHandler() {
@Override
public void handle(int retryCount, Random random, Throwable cause) {
@@ -275,6 +300,11 @@ public class AsyncRpcChannel {
public void onSuccess(Channel channel) {
startHBaseConnection(channel);
}
+
+ @Override
+ public void onSaslProtectionSucess(Channel channel) {
+ startConnectionWithEncryption(channel);
+ }
});
}
@@ -358,6 +388,26 @@ public class AsyncRpcChannel {
* @throws java.io.IOException on failure to write
*/
private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
+ RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
+ int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
+
+ ByteBuf b = channel.alloc().directBuffer(totalSize);
+
+ b.writeInt(header.getSerializedSize());
+ b.writeBytes(header.toByteArray());
+
+ return channel.writeAndFlush(b);
+ }
+
+ private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
+ RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
+ ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
+ b.putInt(header.getSerializedSize());
+ b.put(header.toByteArray());
+ return b.array();
+ }
+
+ private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
.setServiceName(serviceName);
@@ -374,16 +424,7 @@ public class AsyncRpcChannel {
}
headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
- RPCProtos.ConnectionHeader header = headerBuilder.build();
-
- int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
-
- ByteBuf b = channel.alloc().directBuffer(totalSize);
-
- b.writeInt(header.getSerializedSize());
- b.writeBytes(header.toByteArray());
-
- return channel.writeAndFlush(b);
+ return headerBuilder.build();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
index f52987b..c79cde7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
@@ -61,6 +61,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
private final SaslExceptionHandler exceptionHandler;
private final SaslSuccessfulConnectHandler successfulConnectHandler;
private byte[] saslToken;
+ private byte[] connectionHeader;
private boolean firstRead = true;
private int retryCount = 0;
@@ -82,10 +83,11 @@ public class SaslClientHandler extends ChannelDuplexHandler {
*/
public SaslClientHandler(UserGroupInformation ticket, AuthMethod method,
Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed,
- String rpcProtection, SaslExceptionHandler exceptionHandler,
+ String rpcProtection, byte[] connectionHeader, SaslExceptionHandler exceptionHandler,
SaslSuccessfulConnectHandler successfulConnectHandler) throws IOException {
this.ticket = ticket;
this.fallbackAllowed = fallbackAllowed;
+ this.connectionHeader = connectionHeader;
this.exceptionHandler = exceptionHandler;
this.successfulConnectHandler = successfulConnectHandler;
@@ -236,8 +238,13 @@ public class SaslClientHandler extends ChannelDuplexHandler {
if (!useWrap) {
ctx.pipeline().remove(this);
+ successfulConnectHandler.onSuccess(ctx.channel());
+ } else {
+ byte[] wrappedCH = saslClient.wrap(connectionHeader, 0, connectionHeader.length);
+ // write connection header
+ writeSaslToken(ctx, wrappedCH);
+ successfulConnectHandler.onSaslProtectionSucess(ctx.channel());
}
- successfulConnectHandler.onSuccess(ctx.channel());
}
}
// Normal wrapped reading
@@ -322,9 +329,11 @@ public class SaslClientHandler extends ChannelDuplexHandler {
super.write(ctx, msg, promise);
} else {
ByteBuf in = (ByteBuf) msg;
+ byte[] unwrapped = new byte[in.readableBytes()];
+ in.readBytes(unwrapped);
try {
- saslToken = saslClient.wrap(in.array(), in.readerIndex(), in.readableBytes());
+ saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length);
} catch (SaslException se) {
try {
saslClient.dispose();
@@ -375,5 +384,12 @@ public class SaslClientHandler extends ChannelDuplexHandler {
* @param channel which is successfully authenticated
*/
public void onSuccess(Channel channel);
+
+ /**
+ * Runs on success if data protection used in Sasl
+ *
+ * @param channel which is successfully authenticated
+ */
+ public void onSaslProtectionSucess(Channel channel);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
new file mode 100644
index 0000000..e2eb84b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.BlockingService;
+
+import javax.security.sasl.SaslException;
+
+public abstract class AbstractTestSecureIPC {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
+ .getPath());
+
+ static final BlockingService SERVICE =
+ TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
+ new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
+
+ @Override
+ public TestProtos.EmptyResponseProto ping(RpcController controller,
+ TestProtos.EmptyRequestProto request)
+ throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public TestProtos.EmptyResponseProto error(RpcController controller,
+ TestProtos.EmptyRequestProto request)
+ throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public TestProtos.EchoResponseProto echo(RpcController controller,
+ TestProtos.EchoRequestProto request)
+ throws ServiceException {
+ if (controller instanceof PayloadCarryingRpcController) {
+ PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
+ // If cells, scan them to check we are able to iterate what we were given and since
+ // this is
+ // an echo, just put them back on the controller creating a new block. Tests our
+ // block
+ // building.
+ CellScanner cellScanner = pcrc.cellScanner();
+ List<Cell> list = null;
+ if (cellScanner != null) {
+ list = new ArrayList<Cell>();
+ try {
+ while (cellScanner.advance()) {
+ list.add(cellScanner.current());
+ }
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+ cellScanner = CellUtil.createCellScanner(list);
+ ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
+ }
+ return TestProtos.EchoResponseProto.newBuilder()
+ .setMessage(request.getMessage()).build();
+ }
+ });
+
+ private static MiniKdc KDC;
+ private static String HOST = "localhost";
+ private static String PRINCIPAL;
+
+ String krbKeytab;
+ String krbPrincipal;
+ UserGroupInformation ugi;
+ Configuration clientConf;
+ Configuration serverConf;
+
+ abstract Class<? extends RpcClient> getRpcClientClass();
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Properties conf = MiniKdc.createConf();
+ conf.put(MiniKdc.DEBUG, true);
+ KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
+ KDC.start();
+ PRINCIPAL = "hbase/" + HOST;
+ KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
+ HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
+ HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (KDC != null) {
+ KDC.stop();
+ }
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ @Before
+ public void setUpTest() throws Exception {
+ krbKeytab = getKeytabFileForTesting();
+ krbPrincipal = getPrincipalForTesting();
+ ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
+ clientConf = getSecuredConfiguration();
+ clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, getRpcClientClass().getName());
+ serverConf = getSecuredConfiguration();
+ }
+
+ @Test
+ public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
+ UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
+
+ // check that the login user is okay:
+ assertSame(ugi, ugi2);
+ assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
+ assertEquals(krbPrincipal, ugi.getUserName());
+
+ callRpcService(User.create(ugi2));
+ }
+
+ @Test
+ public void testRpcFallbackToSimpleAuth() throws Exception {
+ String clientUsername = "testuser";
+ UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
+ new String[]{clientUsername});
+
+ // check that the client user is insecure
+ assertNotSame(ugi, clientUgi);
+ assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
+ assertEquals(clientUsername, clientUgi.getUserName());
+
+ clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+ serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
+ callRpcService(User.create(clientUgi));
+ }
+
+ void setRpcProtection(String clientProtection, String serverProtection) {
+ clientConf.set("hbase.rpc.protection", clientProtection);
+ serverConf.set("hbase.rpc.protection", serverProtection);
+ }
+
+ /**
+ * Test various qpos of Server and Client.
+ * @throws Exception
+ */
+ @Test
+ public void testSaslWithCommonQop() throws Exception {
+ setRpcProtection("authentication", "authentication");
+ callRpcService(User.create(ugi));
+
+ setRpcProtection("integrity", "integrity");
+ callRpcService(User.create(ugi));
+
+ setRpcProtection("privacy", "privacy");
+ callRpcService(User.create(ugi));
+ }
+
+ private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
+ throws Exception {
+ Configuration cnf = new Configuration();
+ cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(cnf);
+ UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
+ return UserGroupInformation.getLoginUser();
+ }
+
+ /**
+ * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown
+ * from the stub, this function will throw root cause of that exception.
+ */
+ private void callRpcService(User clientUser) throws Exception {
+ SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
+ Mockito.when(securityInfoMock.getServerPrincipal())
+ .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+ SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
+
+ InetSocketAddress isa = new InetSocketAddress(HOST, 0);
+
+ RpcServerInterface rpcServer =
+ new RpcServer(null, "AbstractTestSecureIPC",
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
+ serverConf, new FifoRpcScheduler(serverConf, 1));
+ rpcServer.start();
+ try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
+ HConstants.DEFAULT_CLUSTER_ID.toString())) {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ BlockingRpcChannel channel =
+ rpcClient.createBlockingRpcChannel(
+ ServerName.valueOf(address.getHostName(), address.getPort(),
+ System.currentTimeMillis()), clientUser, 0);
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+ TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
+ TestThread th1 = new TestThread(stub);
+ final Throwable exception[] = new Throwable[1];
+ Collections.synchronizedList(new ArrayList<Throwable>());
+ Thread.UncaughtExceptionHandler exceptionHandler =
+ new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread th, Throwable ex) {
+ exception[0] = ex;
+ }
+ };
+ th1.setUncaughtExceptionHandler(exceptionHandler);
+ th1.start();
+ th1.join();
+ if (exception[0] != null) {
+ // throw root cause.
+ while (exception[0].getCause() != null) {
+ exception[0] = exception[0].getCause();
+ }
+ throw (Exception) exception[0];
+ }
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
+ public static class TestThread extends Thread {
+ private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
+
+ public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) {
+ this.stub = stub;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int[] messageSize = new int[] {100, 1000, 10000};
+ for (int i = 0; i < messageSize.length; i++) {
+ String input = RandomStringUtils.random(messageSize[i]);
+ String result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder()
+ .setMessage(input).build()).getMessage();
+ assertEquals(input, result);
+ }
+ } catch (ServiceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
new file mode 100644
index 0000000..2fc270d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ SecurityTests.class, SmallTests.class })
+public class TestAsyncSecureIPC extends AbstractTestSecureIPC {
+
+ Class<? extends RpcClient> getRpcClientClass() {
+ return AsyncRpcClient.class;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
new file mode 100644
index 0000000..baaa985
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientImpl;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ SecurityTests.class, SmallTests.class })
+public class TestSecureIPC extends AbstractTestSecureIPC {
+
+ Class<? extends RpcClient> getRpcClientClass() {
+ return RpcClientImpl.class;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
deleted file mode 100644
index a5700d0..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ThreadLocalRandom;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcServerInterface;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.minikdc.MiniKdc;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-
-@Category(SmallTests.class)
-public class TestSecureRPC {
-
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
- .getPath());
-
- static final BlockingService SERVICE =
- TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
- new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
-
- @Override
- public TestProtos.EmptyResponseProto ping(RpcController controller,
- TestProtos.EmptyRequestProto request)
- throws ServiceException {
- return null;
- }
-
- @Override
- public TestProtos.EmptyResponseProto error(RpcController controller,
- TestProtos.EmptyRequestProto request)
- throws ServiceException {
- return null;
- }
-
- @Override
- public TestProtos.EchoResponseProto echo(RpcController controller,
- TestProtos.EchoRequestProto request)
- throws ServiceException {
- if (controller instanceof PayloadCarryingRpcController) {
- PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
- // If cells, scan them to check we are able to iterate what we were given and since
- // this is
- // an echo, just put them back on the controller creating a new block. Tests our
- // block
- // building.
- CellScanner cellScanner = pcrc.cellScanner();
- List<Cell> list = null;
- if (cellScanner != null) {
- list = new ArrayList<Cell>();
- try {
- while (cellScanner.advance()) {
- list.add(cellScanner.current());
- }
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
- cellScanner = CellUtil.createCellScanner(list);
- ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
- }
- return TestProtos.EchoResponseProto.newBuilder()
- .setMessage(request.getMessage()).build();
- }
- });
-
- private static MiniKdc KDC;
-
- private static String HOST = "localhost";
-
- private static String PRINCIPAL;
-
- @BeforeClass
- public static void setUp() throws Exception {
- Properties conf = MiniKdc.createConf();
- conf.put(MiniKdc.DEBUG, true);
- KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
- KDC.start();
- PRINCIPAL = "hbase/" + HOST;
- KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
- HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
- HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
- }
-
- @AfterClass
- public static void tearDown() throws IOException {
- if (KDC != null) {
- KDC.stop();
- }
- TEST_UTIL.cleanupTestDir();
- }
-
- @Test
- public void testRpc() throws Exception {
- testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
- }
-
- @Test
- public void testRpcWithInsecureFallback() throws Exception {
- testRpcFallbackToSimpleAuth(RpcClientImpl.class);
- }
-
- @Test
- public void testAsyncRpc() throws Exception {
- testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
- }
-
- @Test
- public void testAsyncRpcWithInsecureFallback() throws Exception {
- testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
- }
-
- private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
- throws Exception {
- String krbKeytab = getKeytabFileForTesting();
- String krbPrincipal = getPrincipalForTesting();
-
- UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
- UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
-
- // check that the login user is okay:
- assertSame(ugi, ugi2);
- assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
- assertEquals(krbPrincipal, ugi.getUserName());
-
- Configuration clientConf = getSecuredConfiguration();
- callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
- }
-
- private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
- throws Exception {
- Configuration cnf = new Configuration();
- cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- UserGroupInformation.setConfiguration(cnf);
- UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
- return UserGroupInformation.getLoginUser();
- }
-
- private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
- Configuration clientConf, boolean allowInsecureFallback)
- throws Exception {
- Configuration clientConfCopy = new Configuration(clientConf);
- clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
-
- Configuration conf = getSecuredConfiguration();
- conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
-
- SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
- Mockito.when(securityInfoMock.getServerPrincipal())
- .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
- SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
-
- InetSocketAddress isa = new InetSocketAddress(HOST, 0);
-
- RpcServerInterface rpcServer =
- new RpcServer(null, "AbstractTestSecureIPC",
- Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
- conf, new FifoRpcScheduler(conf, 1));
- rpcServer.start();
- try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
- HConstants.DEFAULT_CLUSTER_ID.toString())) {
- InetSocketAddress address = rpcServer.getListenerAddress();
- if (address == null) {
- throw new IOException("Listener channel is closed");
- }
- BlockingRpcChannel channel =
- rpcClient.createBlockingRpcChannel(
-
- ServerName.valueOf(address.getHostName(), address.getPort(),
- System.currentTimeMillis()), clientUser, 5000);
- TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
- TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
- List<String> results = new ArrayList<String>();
- TestThread th1 = new TestThread(stub, results);
- th1.start();
- th1.join();
-
- } finally {
- rpcServer.stop();
- }
- }
-
- public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
- String krbKeytab = getKeytabFileForTesting();
- String krbPrincipal = getPrincipalForTesting();
-
- UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
- assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
- assertEquals(krbPrincipal, ugi.getUserName());
-
- String clientUsername = "testuser";
- UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
- new String[]{clientUsername});
-
- // check that the client user is insecure
- assertNotSame(ugi, clientUgi);
- assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
- assertEquals(clientUsername, clientUgi.getUserName());
-
- Configuration clientConf = new Configuration();
- clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
- callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
- }
-
- public static class TestThread extends Thread {
- private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
-
- private final List<String> results;
-
- public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) {
- this.stub = stub;
- this.results = results;
- }
-
- @Override
- public void run() {
- String result;
- try {
- result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf(
- ThreadLocalRandom.current().nextInt())).build()).getMessage();
- } catch (ServiceException e) {
- throw new RuntimeException(e);
- }
- if (results != null) {
- synchronized (results) {
- results.add(result);
- }
- }
- }
- }
-}