You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/01/15 18:51:38 UTC
[2/4] accumulo git commit: ACCUMULO-2815 Support for Kerberos client
authentication.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java
new file mode 100644
index 0000000..b047f1a
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.Base64;
+
+/**
+ * Kerberos principals might contains identifiers that are not valid ZNodes ('/'). Base64-encodes the principals before interacting with ZooKeeper.
+ */
+public class KerberosAuthorizor implements Authorizor {
+
+ private static KerberosAuthorizor INST;
+
+ public static synchronized KerberosAuthorizor getInstance() {
+ if (INST == null)
+ INST = new KerberosAuthorizor();
+ return INST;
+ }
+
+ private final ZKAuthorizor zkAuthorizor;
+
+ public KerberosAuthorizor() {
+ zkAuthorizor = new ZKAuthorizor();
+ }
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ zkAuthorizor.initialize(instanceId, initialize);
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator auth, PermissionHandler pm) {
+ return auth instanceof KerberosAuthenticator && pm instanceof KerberosPermissionHandler;
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials credentials, String rootuser) throws AccumuloSecurityException, ThriftSecurityException {
+ zkAuthorizor.initializeSecurity(credentials, Base64.encodeBase64String(rootuser.getBytes(UTF_8)));
+ }
+
+ @Override
+ public void changeAuthorizations(String user, Authorizations authorizations) throws AccumuloSecurityException {
+ zkAuthorizor.changeAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8)), authorizations);
+ }
+
+ @Override
+ public Authorizations getCachedUserAuthorizations(String user) throws AccumuloSecurityException {
+ return zkAuthorizor.getCachedUserAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8)));
+ }
+
+ @Override
+ public boolean isValidAuthorizations(String user, List<ByteBuffer> list) throws AccumuloSecurityException {
+ return zkAuthorizor.isValidAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8)), list);
+ }
+
+ @Override
+ public void initUser(String user) throws AccumuloSecurityException {
+ zkAuthorizor.initUser(Base64.encodeBase64String(user.getBytes(UTF_8)));
+ }
+
+ @Override
+ public void dropUser(String user) throws AccumuloSecurityException {
+ user = Base64.encodeBase64String(user.getBytes(UTF_8));
+ zkAuthorizor.dropUser(user);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java
new file mode 100644
index 0000000..691c555
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.NamespaceNotFoundException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.security.NamespacePermission;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.Base64;
+
+/**
+ * Kerberos principals might contains identifiers that are not valid ZNodes ('/'). Base64-encodes the principals before interacting with ZooKeeper.
+ */
+public class KerberosPermissionHandler implements PermissionHandler {
+
+ private static KerberosPermissionHandler INST;
+
+ public static synchronized KerberosPermissionHandler getInstance() {
+ if (INST == null)
+ INST = new KerberosPermissionHandler();
+ return INST;
+ }
+
+ private final ZKPermHandler zkPermissionHandler;
+
+ public KerberosPermissionHandler() {
+ zkPermissionHandler = new ZKPermHandler();
+ }
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ zkPermissionHandler.initialize(instanceId, initialize);
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator authent, Authorizor author) {
+ return authent instanceof KerberosAuthenticator && author instanceof KerberosAuthorizor;
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials credentials, String rootuser) throws AccumuloSecurityException, ThriftSecurityException {
+ zkPermissionHandler.initializeSecurity(credentials, Base64.encodeBase64String(rootuser.getBytes(UTF_8)));
+ }
+
+ @Override
+ public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return zkPermissionHandler.hasSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission);
+ }
+
+ @Override
+ public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return zkPermissionHandler.hasCachedSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission);
+ }
+
+ @Override
+ public boolean hasTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return zkPermissionHandler.hasTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission);
+ }
+
+ @Override
+ public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return zkPermissionHandler.hasCachedTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission);
+ }
+
+ @Override
+ public boolean hasNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ return zkPermissionHandler.hasNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission);
+ }
+
+ @Override
+ public boolean hasCachedNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ return zkPermissionHandler.hasCachedNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission);
+ }
+
+ @Override
+ public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ zkPermissionHandler.grantSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission);
+ }
+
+ @Override
+ public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ zkPermissionHandler.revokeSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission);
+ }
+
+ @Override
+ public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ zkPermissionHandler.grantTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission);
+ }
+
+ @Override
+ public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ zkPermissionHandler.revokeTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission);
+ }
+
+ @Override
+ public void grantNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ zkPermissionHandler.grantNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission);
+ }
+
+ @Override
+ public void revokeNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ zkPermissionHandler.revokeNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission);
+ }
+
+ @Override
+ public void cleanTablePermissions(String table) throws AccumuloSecurityException, TableNotFoundException {
+ zkPermissionHandler.cleanTablePermissions(table);
+ }
+
+ @Override
+ public void cleanNamespacePermissions(String namespace) throws AccumuloSecurityException, NamespaceNotFoundException {
+ zkPermissionHandler.cleanNamespacePermissions(namespace);
+ }
+
+ @Override
+ public void initUser(String user) throws AccumuloSecurityException {
+ zkPermissionHandler.initUser(Base64.encodeBase64String(user.getBytes(UTF_8)));
+ }
+
+ @Override
+ public void initTable(String table) throws AccumuloSecurityException {
+ zkPermissionHandler.initTable(table);
+ }
+
+ @Override
+ public void cleanUser(String user) throws AccumuloSecurityException {
+ zkPermissionHandler.cleanUser(Base64.encodeBase64String(user.getBytes(UTF_8)));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java
new file mode 100644
index 0000000..4e4f8ce
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.io.IOException;
+
+import javax.security.sasl.SaslServer;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processor that pulls the SaslServer object out of the transport, and assumes the remote user's UGI before calling through to the original processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ *
+ * Lifted from Apache Hive 0.14
+ */
+public class UGIAssumingProcessor implements TProcessor {
+ private static final Logger log = LoggerFactory.getLogger(UGIAssumingProcessor.class);
+
+ public static final ThreadLocal<String> principal = new ThreadLocal<String>();
+ private final TProcessor wrapped;
+ private final UserGroupInformation loginUser;
+
+ public UGIAssumingProcessor(TProcessor wrapped) {
+ this.wrapped = wrapped;
+ try {
+ this.loginUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ log.error("Failed to obtain login user", e);
+ throw new RuntimeException("Failed to obtain login user", e);
+ }
+ }
+
+ /**
+ * The principal of the user who authenticated over SASL.
+ */
+ public static String currentPrincipal() {
+ return principal.get();
+ }
+
+ @Override
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ TTransport trans = inProt.getTransport();
+ if (!(trans instanceof TSaslServerTransport)) {
+ throw new TException("Unexpected non-SASL transport " + trans.getClass() + ": " + trans);
+ }
+ TSaslServerTransport saslTrans = (TSaslServerTransport) trans;
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ String endUser = authId;
+
+ log.trace("Received SASL RPC from {}", endUser);
+
+ UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(endUser, loginUser);
+ final String remoteUser = clientUgi.getUserName();
+
+ try {
+ // Set the principal in the ThreadLocal for access to get authorizations
+ principal.set(remoteUser);
+
+ return wrapped.process(inProt, outProt);
+ } finally {
+ // Unset the principal after we're done using it just to be sure that it's not incorrectly
+ // used in the same thread down the line.
+ principal.set(null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 7d247f7..8407c15 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.security.Authorizations;
@@ -56,6 +57,7 @@ import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
@@ -174,6 +176,13 @@ public class Admin {
cl.usage();
return;
}
+
+ AccumuloConfiguration siteConf = SiteConfiguration.getInstance();
+ // Login as the server on secure HDFS
+ if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ SecurityUtil.serverLogin(siteConf);
+ }
+
Instance instance = opts.getInstance();
ServerConfigurationFactory confFactory = new ServerConfigurationFactory(instance);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index ef182f1..759d898 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@ -20,9 +20,13 @@ import java.util.List;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.log4j.Logger;
@@ -64,6 +68,12 @@ public class ZooZap {
return;
}
+ AccumuloConfiguration siteConf = SiteConfiguration.getInstance();
+ // Login as the server on secure HDFS
+ if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ SecurityUtil.serverLogin(siteConf);
+ }
+
String iid = opts.getInstance().getInstanceID();
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
new file mode 100644
index 0000000..56f3933
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AccumuloServerContextTest {
+
+ private String user;
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ user = UserGroupInformation.getCurrentUser().getUserName();
+ }
+
+ @Test
+ public void testSasl() throws Exception {
+ MockInstance instance = new MockInstance();
+
+ ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+ clientConf.setProperty(ClientProperty.INSTANCE_RPC_SASL_ENABLED, "true");
+ clientConf.setProperty(ClientProperty.KERBEROS_SERVER_PRIMARY, "accumulo");
+ final AccumuloConfiguration conf = ClientContext.convertClientConfig(clientConf);
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
+
+ ServerConfigurationFactory factory = EasyMock.createMock(ServerConfigurationFactory.class);
+ EasyMock.expect(factory.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+ EasyMock.expect(factory.getInstance()).andReturn(instance).anyTimes();
+
+ AccumuloServerContext context = EasyMock.createMockBuilder(AccumuloServerContext.class).addMockedMethod("enforceKerberosLogin")
+ .addMockedMethod("getConfiguration").addMockedMethod("getServerConfigurationFactory").createMock();
+ context.enforceKerberosLogin();
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.expect(context.getServerConfigurationFactory()).andReturn(factory).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our ClientConfiguration (by way of the AccumuloConfiguration)
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return conf.get((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return conf.iterator();
+ }
+ }).anyTimes();
+
+ EasyMock.replay(factory, context, siteConfig);
+
+ Assert.assertEquals(ThriftServerType.SASL, context.getThriftServerType());
+ SaslConnectionParams saslParams = context.getServerSaslParams();
+ Assert.assertEquals(SaslConnectionParams.forConfig(conf), saslParams);
+ Assert.assertEquals(user, saslParams.getPrincipal());
+
+ EasyMock.verify(factory, context, siteConfig);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
new file mode 100644
index 0000000..aba1aa0
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TCredentialsUpdatingInvocationHandlerTest {
+
+ TCredentialsUpdatingInvocationHandler<Object> proxy;
+
+ @Before
+ public void setup() {
+ proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object());
+ }
+
+ @After
+ public void teardown() {
+ UGIAssumingProcessor.principal.set(null);
+ }
+
+ @Test
+ public void testNoArgsAreIgnored() throws Exception {
+ proxy.updateArgs(new Object[] {});
+ }
+
+ @Test
+ public void testNoTCredsInArgsAreIgnored() throws Exception {
+ proxy.updateArgs(new Object[] {new Object(), new Object()});
+ }
+
+ @Test
+ public void testCachedTokenClass() throws Exception {
+ final String principal = "root";
+ ConcurrentHashMap<String,Class<? extends AuthenticationToken>> cache = proxy.getTokenCache();
+ cache.clear();
+ TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
+ UGIAssumingProcessor.principal.set(principal);
+ proxy.updateArgs(new Object[] {new Object(), tcreds});
+ Assert.assertEquals(1, cache.size());
+ Assert.assertEquals(KerberosToken.class, cache.get(KerberosToken.CLASS_NAME));
+ }
+
+ @Test(expected = ThriftSecurityException.class)
+ public void testMissingPrincipal() throws Exception {
+ final String principal = "root";
+ TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
+ UGIAssumingProcessor.principal.set(null);
+ proxy.updateArgs(new Object[] {new Object(), tcreds});
+ }
+
+ @Test(expected = ThriftSecurityException.class)
+ public void testMismatchedPrincipal() throws Exception {
+ final String principal = "root";
+ TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
+ UGIAssumingProcessor.principal.set(principal + "foobar");
+ proxy.updateArgs(new Object[] {new Object(), tcreds});
+ }
+
+ @Test(expected = ThriftSecurityException.class)
+ public void testWrongTokenType() throws Exception {
+ final String principal = "root";
+ TCredentials tcreds = new TCredentials(principal, PasswordToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+ UGIAssumingProcessor.principal.set(principal);
+ proxy.updateArgs(new Object[] {new Object(), tcreds});
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
new file mode 100644
index 0000000..f3f1bdd
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.conf.Property;
+import org.junit.Test;
+
+public class ThriftServerTypeTest {
+
+ @Test
+ public void testDefaultServer() {
+ assertEquals(ThriftServerType.CUSTOM_HS_HA, ThriftServerType.get(Property.GENERAL_RPC_SERVER_TYPE.getDefaultValue()));
+ }
+
+ @Test
+ public void testSpecialServer() {
+ assertEquals(ThriftServerType.THREADPOOL, ThriftServerType.get("threadpool"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index c380eb7..7efabb6 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -91,7 +91,9 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.accumulo.server.rpc.RpcWrapper;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.tables.TableManager;
import org.apache.accumulo.server.util.Halt;
@@ -707,14 +709,21 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
}
private HostAndPort startStatsService() throws UnknownHostException {
- Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this));
+ Iface rpcProxy = RpcWrapper.service(this);
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass());
+ processor = new Processor<Iface>(tcProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
int port = getConfiguration().getPort(Property.GC_PORT);
long maxMessageSize = getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
log.debug("Starting garbage collector listening on " + result);
try {
- return TServerUtils.startTServer(getConfiguration(), result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, getConfiguration()
- .getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getServerSslParams(), 0).address;
+ return TServerUtils.startTServer(getConfiguration(), result, getThriftServerType(), processor, this.getClass().getSimpleName(), "GC Monitor Service", 2,
+ getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getServerSslParams(), getServerSaslParams(), 0).address;
} catch (Exception ex) {
log.fatal(ex, ex);
throw new RuntimeException(ex);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index f98721f..1d7f90f 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,9 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
@@ -59,6 +63,8 @@ import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -90,14 +96,36 @@ public class GarbageCollectWriteAheadLogsTest {
@Before
public void setUp() throws Exception {
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
instance = createMock(Instance.class);
expect(instance.getInstanceID()).andReturn("mock").anyTimes();
- systemConfig = createMock(AccumuloConfiguration.class);
+ expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
+ expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
+ systemConfig = new ConfigurationCopy(new HashMap<String,String>());
volMgr = createMock(VolumeManager.class);
ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
expect(factory.getInstance()).andReturn(instance).anyTimes();
- replay(instance, factory);
+ expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConfig.get((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return systemConfig.iterator();
+ }
+ }).anyTimes();
+
+ replay(instance, factory, siteConfig);
AccumuloServerContext context = new AccumuloServerContext(factory);
gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
modTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
index 99558b8..6fcdd37 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
@@ -29,10 +29,15 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.trace.thrift.TInfo;
@@ -42,6 +47,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.security.SystemCredentials;
import org.apache.hadoop.fs.Path;
import org.easymock.EasyMock;
+import org.easymock.IAnswer;
import org.junit.Before;
import org.junit.Test;
@@ -51,20 +57,42 @@ public class SimpleGarbageCollectorTest {
private Credentials credentials;
private Opts opts;
private SimpleGarbageCollector gc;
- private AccumuloConfiguration systemConfig;
+ private ConfigurationCopy systemConfig;
@Before
public void setUp() {
volMgr = createMock(VolumeManager.class);
instance = createMock(Instance.class);
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
expect(instance.getInstanceID()).andReturn("mock").anyTimes();
+ expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
+ expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
opts = new Opts();
- systemConfig = mockSystemConfig();
+ systemConfig = createSystemConfig();
ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
expect(factory.getInstance()).andReturn(instance).anyTimes();
- expect(factory.getConfiguration()).andReturn(mockSystemConfig()).anyTimes();
- replay(instance, factory);
+ expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
+ expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConfig.get((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return systemConfig.iterator();
+ }
+ }).anyTimes();
+
+ replay(instance, factory, siteConfig);
credentials = SystemCredentials.get(instance);
gc = new SimpleGarbageCollector(opts, volMgr, factory);
@@ -76,26 +104,20 @@ public class SimpleGarbageCollectorTest {
assertNotNull(gc.getStatus(createMock(TInfo.class), createMock(TCredentials.class)));
}
- private AccumuloConfiguration mockSystemConfig() {
- AccumuloConfiguration systemConfig = createMock(AccumuloConfiguration.class);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L);
- expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2).times(2);
- expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(false);
- expect(systemConfig.getBoolean(Property.GC_FILE_ARCHIVE)).andReturn(false);
- replay(systemConfig);
- return systemConfig;
+ private ConfigurationCopy createSystemConfig() {
+ Map<String,String> conf = new HashMap<>();
+ conf.put(Property.INSTANCE_RPC_SASL_ENABLED.getKey(), "false");
+ conf.put(Property.GC_CYCLE_START.getKey(), "1");
+ conf.put(Property.GC_CYCLE_DELAY.getKey(), "20");
+ conf.put(Property.GC_DELETE_THREADS.getKey(), "2");
+ conf.put(Property.GC_TRASH_IGNORE.getKey(), "false");
+ conf.put(Property.GC_FILE_ARCHIVE.getKey(), "false");
+
+ return new ConfigurationCopy(conf);
}
@Test
public void testInit() throws Exception {
- EasyMock.reset(systemConfig);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L).times(2);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L);
- expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2).times(2);
- expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(false);
- replay(systemConfig);
assertSame(volMgr, gc.getVolumeManager());
assertSame(instance, gc.getInstance());
assertEquals(credentials, gc.getCredentials());
@@ -124,13 +146,7 @@ public class SimpleGarbageCollectorTest {
@Test
public void testMoveToTrash_NotUsingTrash() throws Exception {
- AccumuloConfiguration systemConfig = createMock(AccumuloConfiguration.class);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L);
- expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2);
- expect(systemConfig.getBoolean(Property.GC_FILE_ARCHIVE)).andReturn(false);
- expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(true);
- replay(systemConfig);
+ systemConfig.set(Property.GC_TRASH_IGNORE.getKey(), "true");
Path path = createMock(Path.class);
assertFalse(gc.archiveOrMoveToTrash(path));
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index cad1e01..120692a 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -25,6 +25,7 @@ import static org.easymock.EasyMock.verify;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -41,6 +42,9 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
@@ -84,12 +88,34 @@ public class CloseWriteAheadLogReferencesTest {
@Before
public void setup() {
inst = createMock(Instance.class);
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
expect(inst.getInstanceID()).andReturn(testName.getMethodName()).anyTimes();
- AccumuloConfiguration systemConf = createMock(AccumuloConfiguration.class);
+ expect(inst.getZooKeepers()).andReturn("localhost").anyTimes();
+ expect(inst.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
+ final AccumuloConfiguration systemConf = new ConfigurationCopy(new HashMap<String,String>());
ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
expect(factory.getConfiguration()).andReturn(systemConf).anyTimes();
expect(factory.getInstance()).andReturn(inst).anyTimes();
- replay(inst, factory);
+ expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConf.get((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return systemConf.iterator();
+ }
+ }).anyTimes();
+
+ replay(inst, factory, siteConfig);
refs = new CloseWriteAheadLogReferences(new AccumuloServerContext(factory));
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 12195fa..a6ea6ea 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -118,7 +118,9 @@ import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.replication.ZooKeeperInitialization;
import org.apache.accumulo.server.rpc.RpcWrapper;
import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.SecurityUtil;
@@ -1090,7 +1092,14 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
clientHandler = new MasterClientServiceHandler(this);
- Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(clientHandler));
+ Iface rpcProxy = RpcWrapper.service(clientHandler);
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass());
+ processor = new Processor<Iface>(tcredsProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
ServerAddress sa = TServerUtils.startServer(this, hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null,
Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
clientService = sa.server;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index 580852d..e8dacaf 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@ -221,6 +221,8 @@ public class CompactRange extends MasterRepo {
if (iterators.size() > 0 || !compactionStrategy.equals(CompactionStrategyConfigUtil.DEFAULT_STRATEGY)) {
this.config = WritableUtils.toByteArray(new UserCompactionConfig(this.startRow, this.endRow, iterators, compactionStrategy));
+ } else {
+ log.info("No iterators or compaction strategy");
}
if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
@@ -256,6 +258,9 @@ public class CompactRange extends MasterRepo {
if (tokens[i].startsWith(txidString))
continue; // skip self
+ log.debug("txidString : " + txidString);
+ log.debug("tokens[" + i + "] : " + tokens[i]);
+
throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
"Another compaction with iterators and/or a compaction strategy is running");
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
index 2d98fed..1a098c2 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.monitor.servlets.BasicServlet;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.tracer.TraceFormatter;
abstract class Basic extends BasicServlet {
@@ -88,6 +89,10 @@ abstract class Basic extends BasicServlet {
at = token;
}
+ if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ principal = SecurityUtil.getServerPrincipal(principal);
+ }
+
String table = conf.get(Property.TRACE_TABLE);
try {
Connector conn = HdfsZooInstance.getInstance().getConnector(principal, at);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 3063cdc..f855d9c 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -176,10 +176,16 @@ public class TraceServer implements Watcher {
Connector connector = null;
while (true) {
try {
+ final boolean isDefaultTokenType = conf.get(Property.TRACE_TOKEN_TYPE).equals(Property.TRACE_TOKEN_TYPE.getDefaultValue());
String principal = conf.get(Property.TRACE_USER);
+ if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ // Make sure that we replace _HOST if it exists in the principal
+ principal = SecurityUtil.getServerPrincipal(principal);
+ }
AuthenticationToken at;
Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
- if (loginMap.isEmpty()) {
+ if (loginMap.isEmpty() && isDefaultTokenType) {
+ // Assume the old type of user/password specification
Property p = Property.TRACE_PASSWORD;
at = new PasswordToken(conf.get(p).getBytes(UTF_8));
} else {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 2bfa5a0..b08340f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -172,7 +172,9 @@ import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.replication.ZooKeeperInitialization;
import org.apache.accumulo.server.rpc.RpcWrapper;
import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.SecurityUtil;
@@ -315,7 +317,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
Instance instance = getInstance();
this.sessionManager = new SessionManager(aconf);
this.logSorter = new LogSorter(instance, fs, aconf);
- this.replWorker = new ReplicationWorker(instance, fs, aconf);
+ this.replWorker = new ReplicationWorker(this, fs);
this.statsKeeper = new TabletStatsKeeper();
SimpleTimer.getInstance(aconf).schedule(new Runnable() {
@Override
@@ -2272,8 +2274,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
private HostAndPort startTabletClientService() throws UnknownHostException {
// start listening for client connection last
- Iface tch = RpcWrapper.service(new ThriftClientHandler());
- Processor<Iface> processor = new Processor<Iface>(tch);
+ Iface rpcProxy = RpcWrapper.service(new ThriftClientHandler());
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class);
+ processor = new Processor<Iface>(tcredProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
HostAndPort address = startServer(getServerConfigurationFactory().getConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor,
"Thrift Client Server");
log.info("address = " + address);
@@ -2281,7 +2289,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
private HostAndPort startReplicationService() throws UnknownHostException {
- ReplicationServicer.Iface repl = RpcWrapper.service(new ReplicationServicerHandler(this));
+ final ReplicationServicerHandler handler = new ReplicationServicerHandler(this);
+ ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler);
+ ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass());
ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration();
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index bd6bcd3..de99029 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -19,13 +19,14 @@ package org.apache.accumulo.tserver.replication;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.security.SystemCredentials;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -39,13 +40,15 @@ public class ReplicationWorker implements Runnable {
private Instance inst;
private VolumeManager fs;
+ private Credentials creds;
private AccumuloConfiguration conf;
private ThreadPoolExecutor executor;
- public ReplicationWorker(Instance inst, VolumeManager fs, AccumuloConfiguration conf) {
- this.inst = inst;
+ public ReplicationWorker(ClientContext clientCtx, VolumeManager fs) {
+ this.inst = clientCtx.getInstance();
this.fs = fs;
- this.conf = conf;
+ this.conf = clientCtx.getConfiguration();
+ this.creds = clientCtx.getCredentials();
}
public void setExecutor(ThreadPoolExecutor executor) {
@@ -69,7 +72,7 @@ public class ReplicationWorker implements Runnable {
workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_WORK_QUEUE, conf);
}
- workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get(inst)), executor);
+ workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, creds), executor);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/main/java/org/apache/accumulo/shell/Shell.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
index d897fc3..a64ff45 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
@@ -199,7 +199,6 @@ public class Shell extends ShellOptions {
protected Instance instance;
private Connector connector;
protected ConsoleReader reader;
- private String principal;
private AuthenticationToken token;
private final Class<? extends Formatter> defaultFormatterClass = DefaultFormatter.class;
private final Class<? extends Formatter> binaryFormatterClass = BinaryFormatter.class;
@@ -275,8 +274,22 @@ public class Shell extends ShellOptions {
authTimeout = TimeUnit.MINUTES.toNanos(options.getAuthTimeout());
disableAuthTimeout = options.isAuthTimeoutDisabled();
+ ClientConfiguration clientConf;
+ try {
+ clientConf = options.getClientConfiguration();
+ } catch (Exception e) {
+ printException(e);
+ return true;
+ }
+
// get the options that were parsed
- String user = options.getUsername();
+ final String user;
+ try {
+ user = options.getUsername();
+ } catch (Exception e) {
+ printException(e);
+ return true;
+ }
String password = options.getPassword();
tabCompletion = !options.isTabCompletionDisabled();
@@ -285,7 +298,13 @@ public class Shell extends ShellOptions {
setInstance(options);
// AuthenticationToken options
- token = options.getAuthenticationToken();
+ try {
+ token = options.getAuthenticationToken();
+ } catch (Exception e) {
+ printException(e);
+ return true;
+ }
+
Map<String,String> loginOptions = options.getTokenProperties();
// process default parameters if unspecified
@@ -328,12 +347,11 @@ public class Shell extends ShellOptions {
}
if (!options.isFake()) {
- DistributedTrace.enable(InetAddress.getLocalHost().getHostName(), "shell", options.getClientConfiguration());
+ DistributedTrace.enable(InetAddress.getLocalHost().getHostName(), "shell", clientConf);
}
this.setTableName("");
- this.principal = user;
- connector = instance.getConnector(this.principal, token);
+ connector = instance.getConnector(user, token);
} catch (Exception e) {
printException(e);
@@ -1157,12 +1175,11 @@ public class Shell extends ShellOptions {
public void updateUser(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
connector = instance.getConnector(principal, token);
- this.principal = principal;
this.token = token;
}
public String getPrincipal() {
- return principal;
+ return connector.whoami();
}
public AuthenticationToken getToken() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java b/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
index 875367d..be53d5d 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
@@ -27,8 +27,10 @@ import java.util.TreeMap;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +41,10 @@ import com.beust.jcommander.ParameterException;
import com.beust.jcommander.converters.FileConverter;
public class ShellOptionsJC {
- private static final Logger log = LoggerFactory.getLogger(Shell.class);
+ private static final Logger log = LoggerFactory.getLogger(ShellOptionsJC.class);
@Parameter(names = {"-u", "--user"}, description = "username (defaults to your OS user)")
- private String username = System.getProperty("user.name", "root");
+ private String username = null;
public static class PasswordConverter implements IStringConverter<String> {
public static final String STDIN = "stdin";
@@ -126,7 +128,7 @@ public class ShellOptionsJC {
return Class.forName(value).asSubclass(AuthenticationToken.class).newInstance();
} catch (Exception e) {
// Catching ClassNotFoundException, ClassCastException, InstantiationException and IllegalAccessException
- log.error("Could not instantiate AuthenticationToken " + value, e);
+ log.error("Could not instantiate AuthenticationToken {}", value, e);
throw new ParameterException(e);
}
}
@@ -169,6 +171,9 @@ public class ShellOptionsJC {
@Parameter(names = {"--ssl"}, description = "use ssl to connect to accumulo")
private boolean useSsl = false;
+ @Parameter(names = "--sasl", description = "use SASL to connect to Accumulo (Kerberos)")
+ private boolean useSasl = false;
+
@Parameter(names = "--config-file", description = "read the given client config file. "
+ "If omitted, the path searched can be specified with $ACCUMULO_CLIENT_CONF_PATH, "
+ "which defaults to ~/.accumulo/config:$ACCUMULO_CONF_DIR/client.conf:/etc/accumulo/client.conf")
@@ -189,7 +194,19 @@ public class ShellOptionsJC {
@Parameter(hidden = true)
private List<String> unrecognizedOptions;
- public String getUsername() {
+ public String getUsername() throws Exception {
+ if (null == username) {
+ final ClientConfiguration clientConf = getClientConfiguration();
+ if (Boolean.parseBoolean(clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED))) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ throw new RuntimeException("Kerberos security is not enabled");
+ }
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ username = ugi.getUserName();
+ } else {
+ username = System.getProperty("user.name", "root");
+ }
+ }
return username;
}
@@ -197,7 +214,15 @@ public class ShellOptionsJC {
return password;
}
- public AuthenticationToken getAuthenticationToken() {
+ public AuthenticationToken getAuthenticationToken() throws Exception {
+ if (null == authenticationToken) {
+ final ClientConfiguration clientConf = getClientConfiguration();
+ // Automatically use a KerberosToken if the client conf is configured for SASL
+ final boolean saslEnabled = Boolean.parseBoolean(clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ if (saslEnabled) {
+ authenticationToken = new KerberosToken();
+ }
+ }
return authenticationToken;
}
@@ -275,7 +300,13 @@ public class ShellOptionsJC {
if (useSsl()) {
clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SSL_ENABLED, "true");
}
+ if (useSasl()) {
+ clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SASL_ENABLED, "true");
+ }
return clientConfig;
}
+ public boolean useSasl() {
+ return useSasl;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java
----------------------------------------------------------------------
diff --git a/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java b/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java
new file mode 100644
index 0000000..0c4e4c7
--- /dev/null
+++ b/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.shell;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.beust.jcommander.JCommander;
+
+/**
+ *
+ */
+public class ShellOptionsJCTest {
+
+ ShellOptionsJC options;
+
+ @Before
+ public void setup() {
+ options = new ShellOptionsJC();
+ }
+
+ @Test
+ public void testSasl() throws Exception {
+ JCommander jc = new JCommander();
+
+ jc.setProgramName("accumulo shell");
+ jc.addObject(options);
+ jc.parse(new String[] {"--sasl"});
+ ClientConfiguration clientConf = options.getClientConfiguration();
+ assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index 16f4125..b58df3c 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -156,6 +156,21 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <!-- Specifically depend on this version of minikdc to avoid having
+ to increase out normal hadoop dependency -->
+ <version>2.3.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <!-- Pulls in an older bouncycastle version -->
+ <exclusion>
+ <groupId>bouncycastle</groupId>
+ <artifactId>bcprov-jdk15</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
@@ -202,6 +217,7 @@
<timeout.factor>${timeout.factor}</timeout.factor>
<org.apache.accumulo.test.functional.useCredProviderForIT>${useCredProviderForIT}</org.apache.accumulo.test.functional.useCredProviderForIT>
<org.apache.accumulo.test.functional.useSslForIT>${useSslForIT}</org.apache.accumulo.test.functional.useSslForIT>
+ <org.apache.accumulo.test.functional.useKrbForIT>${useKrbForIT}</org.apache.accumulo.test.functional.useKrbForIT>
</systemPropertyVariables>
</configuration>
</plugin>
@@ -212,6 +228,7 @@
<systemPropertyVariables>
<org.apache.accumulo.test.functional.useCredProviderForIT>${useCredProviderForIT}</org.apache.accumulo.test.functional.useCredProviderForIT>
<org.apache.accumulo.test.functional.useSslForIT>${useSslForIT}</org.apache.accumulo.test.functional.useSslForIT>
+ <org.apache.accumulo.test.functional.useKrbForIT>${useKrbForIT}</org.apache.accumulo.test.functional.useKrbForIT>
</systemPropertyVariables>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 3bb44ff..0b047cb 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -102,8 +103,8 @@ public class ZombieTServer {
TransactionWatcher watcher = new TransactionWatcher();
final ThriftClientHandler tch = new ThriftClientHandler(context, watcher);
Processor<Iface> processor = new Processor<Iface>(tch);
- ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer",
- "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, -1);
+ ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), ThriftServerType.CUSTOM_HS_HA,
+ processor, "ZombieTServer", "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1);
String addressString = serverPort.address.toString();
String zPath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addressString;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 0afa243..b429607 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -70,6 +70,7 @@ import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
@@ -254,8 +255,8 @@ public class NullTserver {
TransactionWatcher watcher = new TransactionWatcher();
ThriftClientHandler tch = new ThriftClientHandler(new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())), watcher);
Processor<Iface> processor = new Processor<Iface>(tch);
- TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", opts.port), processor, "NullTServer", "null tserver", 2, 1, 1000,
- 10 * 1024 * 1024, null, -1);
+ TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", opts.port), ThriftServerType.CUSTOM_HS_HA, processor, "NullTServer",
+ "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1);
HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
index 8f7e1b7..c1ad17b 100644
--- a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
+++ b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
@@ -34,7 +34,9 @@ import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -48,6 +50,7 @@ import com.google.common.base.Preconditions;
*/
public abstract class AccumuloClusterIT extends AccumuloIT implements MiniClusterConfigurationCallback {
private static final Logger log = LoggerFactory.getLogger(AccumuloClusterIT.class);
+ private static final String TRUE = Boolean.toString(true);
public static enum ClusterType {
MINI, STANDALONE;
@@ -62,15 +65,68 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste
protected static AccumuloCluster cluster;
protected static ClusterType type;
protected static AccumuloClusterPropertyConfiguration clusterConf;
+ protected static TestingKdc krb;
@BeforeClass
public static void setUp() throws Exception {
clusterConf = AccumuloClusterPropertyConfiguration.get();
type = clusterConf.getClusterType();
+ if (ClusterType.MINI == type && TRUE.equals(System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION))) {
+ krb = new TestingKdc();
+ krb.start();
+ }
+
initialized = true;
}
+ @AfterClass
+ public static void tearDownKdc() throws Exception {
+ if (null != krb) {
+ krb.stop();
+ }
+ }
+
+ /**
+ * {@link TestingKdc#getAccumuloKeytab()}
+ */
+ public static File getAccumuloKeytab() {
+ if (null == krb) {
+ throw new RuntimeException("KDC not enabled");
+ }
+ return krb.getAccumuloKeytab();
+ }
+
+ /**
+ * {@link TestingKdc#getAccumuloPrincipal()}
+ */
+ public static String getAccumuloPrincipal() {
+ if (null == krb) {
+ throw new RuntimeException("KDC not enabled");
+ }
+ return krb.getAccumuloPrincipal();
+ }
+
+ /**
+ * {@link TestingKdc#getClientKeytab()}
+ */
+ public static File getClientKeytab() {
+ if (null == krb) {
+ throw new RuntimeException("KDC not enabled");
+ }
+ return krb.getClientKeytab();
+ }
+
+ /**
+ * {@link TestingKdc#getClientPrincipal()}
+ */
+ public static String getClientPrincipal() {
+ if (null == krb) {
+ throw new RuntimeException("KDC not enabled");
+ }
+ return krb.getClientPrincipal();
+ }
+
@Before
public void setupCluster() throws Exception {
// Before we try to instantiate the cluster, check to see if the test even wants to run against this type of cluster
@@ -80,7 +136,7 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste
case MINI:
MiniClusterHarness miniClusterHarness = new MiniClusterHarness();
// Intrinsically performs the callback to let tests alter MiniAccumuloConfig and core-site.xml
- cluster = miniClusterHarness.create(this, getToken());
+ cluster = miniClusterHarness.create(this, getToken(), krb);
break;
case STANDALONE:
StandaloneAccumuloClusterConfiguration conf = (StandaloneAccumuloClusterConfiguration) clusterConf;
@@ -98,6 +154,10 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste
if (type.isDynamic()) {
cluster.start();
+ if (null != krb) {
+ // Log in the 'client' user
+ UserGroupInformation.loginUserFromKeytab(getClientPrincipal(), getClientKeytab().getAbsolutePath());
+ }
} else {
log.info("Removing tables which appear to be from a previous test run");
cleanupTables();