You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/01/15 18:51:40 UTC
[4/4] accumulo git commit: ACCUMULO-2815 Support for Kerberos client
authentication.
ACCUMULO-2815 Support for Kerberos client authentication.
Leverage SASL transport provided by Thrift which can speak GSSAPI,
which Kerberos implements. Introduced...
* An Accumulo KerberosToken which is an AuthenticationToken to
validate users.
* Custom thrift processor and invocation handler to ensure server
RPCs have a valid KRB identity and Accumulo authentication.
* Authenticator, Authorizor and PermissionHandler for kerberos
* New ClientConf variables to use SASL transport and pass KRB
server primary (from principal)
* Updated ClientOpts and Shell opts to transparently use a
KerberosToken when SASL is enabled (no extra client work).
* Ensure existing unit tests still function.
* Throw ThriftSecurityExceptions on bad authentication to ensure
proper client action is taken.
* Fall back to krb principal before local OS user
* Initialize accepts a "root" user and defaults to not prompting
for a password to that user acct w/ SASL enabled.
* Use properties specific to server primary and realm for
clients to connect to servers (required for SASL handshake).
* Basic KerberosIT testing basic functionality (MiniKdc)
* Introduction of useKrbForIT option to run AccumuloClusterITs
with Kerberos (not 100% coverage) (MiniKdc)
* Ensure system user doesn't get a "real" user acct.
* Ensure that start-all.sh and stop-all.sh don't require krb creds
* Add user manual documentation
* Use the full krb principal as the accumulo principal
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4f19aa1f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4f19aa1f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4f19aa1f
Branch: refs/heads/master
Commit: 4f19aa1f8a629ba76e9c7b517b3356ba21865ec9
Parents: 8dc68b9
Author: Josh Elser <el...@apache.org>
Authored: Tue Dec 9 00:03:05 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jan 15 11:47:59 2015 -0500
----------------------------------------------------------------------
README | 42 +--
.../apache/accumulo/core/cli/ClientOpts.java | 64 +++-
.../core/client/ClientConfiguration.java | 46 ++-
.../core/client/impl/ClientContext.java | 37 +-
.../core/client/impl/ConnectorImpl.java | 6 +-
.../accumulo/core/client/impl/MasterClient.java | 3 +-
.../core/client/impl/ThriftTransportKey.java | 39 +-
.../core/client/impl/ThriftTransportPool.java | 4 +-
.../client/security/tokens/KerberosToken.java | 136 +++++++
.../org/apache/accumulo/core/conf/Property.java | 16 +-
.../accumulo/core/rpc/FilterTransport.java | 105 ++++++
.../accumulo/core/rpc/SaslConnectionParams.java | 244 +++++++++++++
.../apache/accumulo/core/rpc/ThriftUtil.java | 81 ++++-
.../accumulo/core/rpc/UGIAssumingTransport.java | 70 ++++
.../core/rpc/UGIAssumingTransportFactory.java | 55 +++
.../accumulo/core/cli/TestClientOpts.java | 114 +++++-
.../core/client/ClientConfigurationTest.java | 81 +++++
.../client/impl/ThriftTransportKeyTest.java | 84 +++++
.../core/conf/ClientConfigurationTest.java | 66 ----
.../core/rpc/SaslConnectionParamsTest.java | 103 ++++++
.../main/asciidoc/accumulo_user_manual.asciidoc | 2 +
docs/src/main/asciidoc/chapters/clients.txt | 11 +
docs/src/main/asciidoc/chapters/kerberos.txt | 355 +++++++++++++++++++
.../impl/MiniAccumuloClusterImpl.java | 17 +-
.../impl/MiniAccumuloConfigImpl.java | 20 ++
pom.xml | 24 +-
.../java/org/apache/accumulo/proxy/Proxy.java | 3 +-
.../accumulo/server/AccumuloServerContext.java | 66 ++++
.../apache/accumulo/server/init/Initialize.java | 102 +++++-
.../TCredentialsUpdatingInvocationHandler.java | 133 +++++++
.../server/rpc/TCredentialsUpdatingWrapper.java | 38 ++
.../accumulo/server/rpc/TServerUtils.java | 176 ++++++++-
.../accumulo/server/rpc/ThriftServerType.java | 49 +++
.../server/security/SecurityOperation.java | 48 ++-
.../accumulo/server/security/SecurityUtil.java | 16 +-
.../server/security/SystemCredentials.java | 17 +-
.../security/handler/KerberosAuthenticator.java | 181 ++++++++++
.../security/handler/KerberosAuthorizor.java | 90 +++++
.../handler/KerberosPermissionHandler.java | 154 ++++++++
.../server/thrift/UGIAssumingProcessor.java | 90 +++++
.../org/apache/accumulo/server/util/Admin.java | 9 +
.../org/apache/accumulo/server/util/ZooZap.java | 10 +
.../server/AccumuloServerContextTest.java | 102 ++++++
...redentialsUpdatingInvocationHandlerTest.java | 93 +++++
.../server/rpc/ThriftServerTypeTest.java | 36 ++
.../accumulo/gc/SimpleGarbageCollector.java | 15 +-
.../gc/GarbageCollectWriteAheadLogsTest.java | 32 +-
.../accumulo/gc/SimpleGarbageCollectorTest.java | 72 ++--
.../CloseWriteAheadLogReferencesTest.java | 30 +-
.../java/org/apache/accumulo/master/Master.java | 11 +-
.../accumulo/master/tableOps/CompactRange.java | 5 +
.../accumulo/monitor/servlets/trace/Basic.java | 5 +
.../org/apache/accumulo/tracer/TraceServer.java | 8 +-
.../apache/accumulo/tserver/TabletServer.java | 18 +-
.../tserver/replication/ReplicationWorker.java | 13 +-
.../java/org/apache/accumulo/shell/Shell.java | 33 +-
.../apache/accumulo/shell/ShellOptionsJC.java | 41 ++-
.../accumulo/shell/ShellOptionsJCTest.java | 51 +++
test/pom.xml | 17 +
.../accumulo/test/functional/ZombieTServer.java | 5 +-
.../test/performance/thrift/NullTserver.java | 5 +-
.../accumulo/harness/AccumuloClusterIT.java | 62 +++-
.../accumulo/harness/MiniClusterHarness.java | 106 +++++-
.../accumulo/harness/SharedMiniClusterIT.java | 45 ++-
.../org/apache/accumulo/harness/TestingKdc.java | 165 +++++++++
.../conf/AccumuloMiniClusterConfiguration.java | 62 +++-
.../server/security/SystemCredentialsIT.java | 76 +++-
.../test/ArbitraryTablePropertiesIT.java | 8 +
.../org/apache/accumulo/test/CleanWalIT.java | 10 +-
.../test/functional/BatchScanSplitIT.java | 3 +-
.../accumulo/test/functional/KerberosIT.java | 316 +++++++++++++++++
.../accumulo/test/functional/MetadataIT.java | 7 +
.../test/security/KerberosTokenTest.java | 108 ++++++
test/src/test/resources/log4j.properties | 9 +
74 files changed, 4290 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/README
----------------------------------------------------------------------
diff --git a/README b/README
index 27b3c66..4ebb078 100644
--- a/README
+++ b/README
@@ -364,45 +364,9 @@ certain column.
row1 colf1:colq2 [] val2
-If you are running on top of hdfs with kerberos enabled, then you need to do
-some extra work. First, create an Accumulo principal
-
- kadmin.local -q "addprinc -randkey accumulo/<host.domain.name>"
-
-where <host.domain.name> is replaced by a fully qualified domain name. Export
-the principals to a keytab file. It is safer to create a unique keytab file for each
-server, but you can also glob them if you wish.
-
- kadmin.local -q "xst -k accumulo.keytab -glob accumulo*"
-
-Place this file in $ACCUMULO_CONF_DIR for every host. It should be owned by
-the accumulo user and chmodded to 400. Add the following to the accumulo-env.sh
-
- kinit -kt $ACCUMULO_HOME/conf/accumulo.keytab accumulo/`hostname -f`
-
-In the accumulo-site.xml file on each node, add settings for general.kerberos.keytab
-and general.kerberos.principal, where the keytab setting is the absolute path
-to the keytab file ($ACCUMULO_HOME is valid to use) and principal is set to
-accumulo/_HOST@<REALM>, where REALM is set to your kerberos realm. You may use
-_HOST in lieu of your individual host names.
-
- <property>
- <name>general.kerberos.keytab</name>
- <value>$ACCUMULO_CONF_DIR/accumulo.keytab</value>
- </property>
-
- <property>
- <name>general.kerberos.principal</name>
- <value>accumulo/_HOST@MYREALM</value>
- </property>
-
-You can then start up Accumulo as you would with the accumulo user, and it will
-automatically handle the kerberos keys needed to access hdfs.
-
-Please Note: You may have issues initializing Accumulo while running kerberos HDFS.
-You can resolve this by temporarily granting the accumulo user write access to the
-hdfs root directory, running init, and then revoking write permission in the root
-directory (be sure to maintain access to the /accumulo directory).
+For information on how to configure Accumulo for on top of Secure HDFS with
+Kerberos, please consult the Accumulo user manual section specifically devoted
+to client and server configuration with Kerberos.
******************************************************************************
6. Monitoring Apache Accumulo
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
index eb020eb..f1a0393 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -105,7 +106,7 @@ public class ClientOpts extends Help {
}
@Parameter(names = {"-u", "--user"}, description = "Connection user")
- public String principal = System.getProperty("user.name");
+ public String principal = null;
@Parameter(names = "-p", converter = PasswordConverter.class, description = "Connection password")
public Password password = null;
@@ -114,17 +115,19 @@ public class ClientOpts extends Help {
public Password securePassword = null;
@Parameter(names = {"-tc", "--tokenClass"}, description = "Token class")
- public String tokenClassName = PasswordToken.class.getName();
+ public String tokenClassName = null;
@DynamicParameter(names = "-l",
description = "login properties in the format key=value. Reuse -l for each property (prompt for properties if this option is missing")
public Map<String,String> loginProps = new LinkedHashMap<String,String>();
public AuthenticationToken getToken() {
- if (!loginProps.isEmpty()) {
- Properties props = new Properties();
- for (Entry<String,String> loginOption : loginProps.entrySet())
- props.put(loginOption.getKey(), loginOption.getValue());
+ if (null != tokenClassName) {
+ final Properties props = new Properties();
+ if (!loginProps.isEmpty()) {
+ for (Entry<String,String> loginOption : loginProps.entrySet())
+ props.put(loginOption.getKey(), loginOption.getValue());
+ }
try {
AuthenticationToken token = Class.forName(tokenClassName).asSubclass(AuthenticationToken.class).newInstance();
@@ -166,6 +169,9 @@ public class ClientOpts extends Help {
@Parameter(names = "--ssl", description = "Connect to accumulo over SSL")
public boolean sslEnabled = false;
+ @Parameter(names = "--sasl", description = "Connecto to Accumulo using SASL (supports Kerberos)")
+ public boolean saslEnabled = false;
+
@Parameter(names = "--config-file", description = "Read the given client config file. "
+ "If omitted, the path searched can be specified with $ACCUMULO_CLIENT_CONF_PATH, "
+ "which defaults to ~/.accumulo/config:$ACCUMULO_CONF_DIR/client.conf:/etc/accumulo/client.conf")
@@ -189,11 +195,32 @@ public class ClientOpts extends Help {
Trace.off();
}
+ /**
+ * Automatically update the options to use a KerberosToken when SASL is enabled for RPCs. Don't overwrite the options if the user has provided something
+ * specifically.
+ */
+ protected void updateKerberosCredentials() {
+ ClientConfiguration clientConfig;
+ try {
+ if (clientConfigFile == null)
+ clientConfig = ClientConfiguration.loadDefault();
+ else
+ clientConfig = new ClientConfiguration(new PropertiesConfiguration(clientConfigFile));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ final boolean clientConfSaslEnabled = Boolean.parseBoolean(clientConfig.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ if ((saslEnabled || clientConfSaslEnabled) && null == tokenClassName) {
+ tokenClassName = KerberosToken.CLASS_NAME;
+ }
+ }
+
@Override
public void parseArgs(String programName, String[] args, Object... others) {
super.parseArgs(programName, args, others);
startDebugLogging();
startTracing(programName);
+ updateKerberosCredentials();
}
protected Instance cachedInstance = null;
@@ -207,10 +234,25 @@ public class ClientOpts extends Help {
return cachedInstance = new ZooKeeperInstance(this.getClientConfiguration());
}
+ public String getPrincipal() throws AccumuloSecurityException {
+ if (null == principal) {
+ AuthenticationToken token = getToken();
+ if (null == token) {
+ throw new AccumuloSecurityException("No principal or authentication token was provided", SecurityErrorCode.BAD_CREDENTIALS);
+ }
+
+ // Try to extract the principal automatically from Kerberos
+ if (token instanceof KerberosToken) {
+ principal = ((KerberosToken) token).getPrincipal();
+ } else {
+ principal = System.getProperty("user.name");
+ }
+ }
+ return principal;
+ }
+
public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
- if (this.principal == null || this.getToken() == null)
- throw new AccumuloSecurityException("You must provide a user (-u) and password (-p)", SecurityErrorCode.BAD_CREDENTIALS);
- return getInstance().getConnector(principal, getToken());
+ return getInstance().getConnector(getPrincipal(), getToken());
}
public ClientConfiguration getClientConfiguration() throws IllegalArgumentException {
@@ -228,6 +270,10 @@ public class ClientOpts extends Help {
}
if (sslEnabled)
clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SSL_ENABLED, "true");
+
+ if (saslEnabled)
+ clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SASL_ENABLED, "true");
+
if (siteFile != null) {
AccumuloConfiguration config = new AccumuloConfiguration() {
Configuration xml = new Configuration();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
index df53645..d37d471 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
@@ -47,6 +47,7 @@ public class ClientConfiguration extends CompositeConfiguration {
public static final String GLOBAL_CONF_FILENAME = "client.conf";
public enum ClientProperty {
+ // SSL
RPC_SSL_TRUSTSTORE_PATH(Property.RPC_SSL_TRUSTSTORE_PATH),
RPC_SSL_TRUSTSTORE_PASSWORD(Property.RPC_SSL_TRUSTSTORE_PASSWORD),
RPC_SSL_TRUSTSTORE_TYPE(Property.RPC_SSL_TRUSTSTORE_TYPE),
@@ -57,13 +58,34 @@ public class ClientConfiguration extends CompositeConfiguration {
GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS),
INSTANCE_RPC_SSL_CLIENT_AUTH(Property.INSTANCE_RPC_SSL_CLIENT_AUTH),
INSTANCE_RPC_SSL_ENABLED(Property.INSTANCE_RPC_SSL_ENABLED),
+
+ // ZooKeeper
INSTANCE_ZK_HOST(Property.INSTANCE_ZK_HOST),
INSTANCE_ZK_TIMEOUT(Property.INSTANCE_ZK_TIMEOUT),
+
+ // Instance information
INSTANCE_NAME("instance.name", null, PropertyType.STRING, "Name of Accumulo instance to connect to"),
INSTANCE_ID("instance.id", null, PropertyType.STRING, "UUID of Accumulo instance to connect to"),
+
+ // Tracing
TRACE_SPAN_RECEIVERS(Property.TRACE_SPAN_RECEIVERS),
TRACE_SPAN_RECEIVER_PREFIX(Property.TRACE_SPAN_RECEIVER_PREFIX),
- TRACE_ZK_PATH(Property.TRACE_ZK_PATH);
+ TRACE_ZK_PATH(Property.TRACE_ZK_PATH),
+
+ // SASL / GSSAPI(Kerberos)
+ /**
+ * @since 1.7.0
+ */
+ INSTANCE_RPC_SASL_ENABLED(Property.INSTANCE_RPC_SASL_ENABLED),
+ /**
+ * @since 1.7.0
+ */
+ RPC_SASL_QOP(Property.RPC_SASL_QOP),
+ /**
+ * @since 1.7.0
+ */
+ KERBEROS_SERVER_PRIMARY("kerberos.server.primary", "accumulo", PropertyType.STRING,
+ "The first component of the Kerberos principal, the 'primary', that Accumulo servers use to login");
private String key;
private String defaultValue;
@@ -356,4 +378,26 @@ public class ClientConfiguration extends CompositeConfiguration {
setProperty(ClientProperty.RPC_SSL_KEYSTORE_TYPE, type);
return this;
}
+
+ /**
+ * Same as {@link #with(ClientProperty, String)} for ClientProperty.INSTANCE_RPC_SASL_ENABLED.
+ *
+ * @since 1.7.0
+ */
+ public ClientConfiguration withSasl(boolean saslEnabled) {
+ return with(ClientProperty.INSTANCE_RPC_SASL_ENABLED, String.valueOf(saslEnabled));
+ }
+
+ /**
+ * Same as {@link #with(ClientProperty, String)} for ClientProperty.INSTANCE_RPC_SASL_ENABLED and ClientProperty.GENERAL_KERBEROS_PRINCIPAL.
+ *
+ * @param saslEnabled
+ * Should SASL(kerberos) be enabled
+ * @param kerberosServerPrimary
+ * The 'primary' component of the Kerberos principal Accumulo servers use to login (e.g. 'accumulo' in 'accumulo/_HOST@REALM')
+ * @since 1.7.0
+ */
+ public ClientConfiguration withSasl(boolean saslEnabled, String kerberosServerPrimary) {
+ return withSasl(saslEnabled).with(ClientProperty.KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
index e75bec6..8470da4 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
@@ -33,6 +34,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.CredentialProviderFactoryShim;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.security.thrift.TCredentials;
@@ -52,6 +54,7 @@ public class ClientContext {
private final Instance inst;
private Credentials creds;
+ private ClientConfiguration clientConf;
private final AccumuloConfiguration rpcConf;
private Connector conn;
@@ -60,6 +63,7 @@ public class ClientContext {
*/
public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf) {
this(instance, credentials, convertClientConfig(checkNotNull(clientConf, "clientConf is null")));
+ this.clientConf = clientConf;
}
/**
@@ -69,6 +73,7 @@ public class ClientContext {
inst = checkNotNull(instance, "instance is null");
creds = checkNotNull(credentials, "credentials is null");
rpcConf = checkNotNull(serverConf, "serverConf is null");
+ clientConf = null;
}
/**
@@ -115,6 +120,17 @@ public class ClientContext {
}
/**
+ * Retrieve SASL configuration to initiate an RPC connection to a server
+ */
+ public SaslConnectionParams getClientSaslParams() {
+ // Use the clientConf if we have it
+ if (null != clientConf) {
+ return SaslConnectionParams.forConfig(clientConf);
+ }
+ return SaslConnectionParams.forConfig(getConfiguration());
+ }
+
+ /**
* Retrieve a connector
*/
public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
@@ -171,10 +187,19 @@ public class ClientContext {
}
}
}
+
if (config.containsKey(key))
return config.getString(key);
- else
+ else {
+ // Reconstitute the server kerberos property from the client config
+ if (Property.GENERAL_KERBEROS_PRINCIPAL == property) {
+ if (config.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) {
+ // Avoid providing a realm since we don't know what it is...
+ return config.getString(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()) + "/_HOST@" + SaslConnectionParams.getDefaultRealm();
+ }
+ }
return defaults.get(property);
+ }
}
@Override
@@ -188,6 +213,16 @@ public class ClientContext {
props.put(key, config.getString(key));
}
+ // Two client props that don't exist on the server config. Client doesn't need to know about the Kerberos instance from the principle, but servers do
+ // Automatically reconstruct the server property when converting a client config.
+ if (props.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) {
+ final String serverPrimary = props.remove(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey());
+ if (filter.accept(Property.GENERAL_KERBEROS_PRINCIPAL.getKey())) {
+ // Use the _HOST expansion. It should be unnecessary in "client land".
+ props.put(Property.GENERAL_KERBEROS_PRINCIPAL.getKey(), serverPrimary + "/_HOST@" + SaslConnectionParams.getDefaultRealm());
+ }
+ }
+
// Attempt to load sensitive properties from a CredentialProvider, if configured
org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration();
if (null != hadoopConf) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index f481cc3..443e548 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Tracer;
public class ConnectorImpl extends Connector {
+ private static final String SYSTEM_TOKEN_NAME = "org.apache.accumulo.server.security.SystemCredentials$SystemToken";
private final ClientContext context;
private SecurityOperations secops = null;
private TableOperationsImpl tableops = null;
@@ -60,8 +61,9 @@ public class ConnectorImpl extends Connector {
this.context = context;
- // Skip fail fast for system services; string literal for class name, to avoid
- if (!"org.apache.accumulo.server.security.SystemCredentials$SystemToken".equals(context.getCredentials().getToken().getClass().getName())) {
+ // Skip fail fast for system services; string literal for class name, to avoid dependency on server jar
+ final String tokenClassName = context.getCredentials().getToken().getClass().getName();
+ if (!SYSTEM_TOKEN_NAME.equals(tokenClassName)) {
ServerClient.execute(context, new ClientExec<ClientService.Client>() {
@Override
public void execute(ClientService.Client iface) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index fcbf9f9..9dad794 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@ -68,7 +68,8 @@ public class MasterClient {
MasterClientService.Client client = ThriftUtil.getClientNoTimeout(new MasterClientService.Client.Factory(), master, context);
return client;
} catch (TTransportException tte) {
- if (tte.getCause().getClass().equals(UnknownHostException.class)) {
+ Throwable cause = tte.getCause();
+ if (null != cause && cause instanceof UnknownHostException) {
// do not expect to recover from this
throw new RuntimeException(tte);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
index 6dc846f..072724b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import com.google.common.net.HostAndPort;
@@ -26,6 +27,7 @@ class ThriftTransportKey {
private final HostAndPort server;
private final long timeout;
private final SslConnectionParams sslParams;
+ private final SaslConnectionParams saslParams;
private int hash = -1;
@@ -34,6 +36,24 @@ class ThriftTransportKey {
this.server = server;
this.timeout = timeout;
this.sslParams = context.getClientSslParams();
+ this.saslParams = context.getClientSaslParams();
+ if (null != saslParams) {
+ // TSasl and TSSL transport factories don't play nicely together
+ if (null != sslParams) {
+ throw new RuntimeException("Cannot use both SSL and SASL thrift transports");
+ }
+ }
+ }
+
+ /**
+ * Visible only for testing
+ */
+ ThriftTransportKey(HostAndPort server, long timeout, SslConnectionParams sslParams, SaslConnectionParams saslParams) {
+ checkNotNull(server, "location is null");
+ this.server = server;
+ this.timeout = timeout;
+ this.sslParams = sslParams;
+ this.saslParams = saslParams;
}
HostAndPort getServer() {
@@ -48,12 +68,17 @@ class ThriftTransportKey {
return sslParams != null;
}
+ public boolean isSasl() {
+ return saslParams != null;
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ThriftTransportKey))
return false;
ThriftTransportKey ttk = (ThriftTransportKey) o;
- return server.equals(ttk.server) && timeout == ttk.timeout && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams)));
+ return server.equals(ttk.server) && timeout == ttk.timeout && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams)))
+ && (!isSasl() || (ttk.isSasl() && saslParams.equals(ttk.saslParams)));
}
@Override
@@ -65,10 +90,20 @@ class ThriftTransportKey {
@Override
public String toString() {
- return (isSsl() ? "ssl:" : "") + server + " (" + Long.toString(timeout) + ")";
+ String prefix = "";
+ if (isSsl()) {
+ prefix = "ssl:";
+ } else if (isSasl()) {
+ prefix = "sasl:" + saslParams.getPrincipal() + "@";
+ }
+ return prefix + server + " (" + Long.toString(timeout) + ")";
}
public SslConnectionParams getSslParams() {
return sslParams;
}
+
+ public SaslConnectionParams getSaslParams() {
+ return saslParams;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 5da803b..bc1cdbb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -465,7 +465,7 @@ public class ThriftTransportPool {
try {
return new Pair<String,TTransport>(ttk.getServer().toString(), createNewTransport(ttk));
} catch (TTransportException tte) {
- log.debug("Failed to connect to " + servers.get(index), tte);
+ log.debug("Failed to connect to {}", servers.get(index), tte);
servers.remove(index);
retryCount++;
}
@@ -475,7 +475,7 @@ public class ThriftTransportPool {
}
private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
- TTransport transport = ThriftUtil.createClientTransport(cacheKey.getServer(), (int) cacheKey.getTimeout(), cacheKey.getSslParams());
+ TTransport transport = ThriftUtil.createClientTransport(cacheKey.getServer(), (int) cacheKey.getTimeout(), cacheKey.getSslParams(), cacheKey.getSaslParams());
log.trace("Creating new connection to connection to {}", cacheKey.getServer());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
new file mode 100644
index 0000000..d7d2e15
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.security.tokens;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import javax.security.auth.DestroyFailedException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Authentication token for Kerberos authenticated clients
+ *
+ * @since 1.7.0
+ */
+public class KerberosToken implements AuthenticationToken {
+
+ public static final String CLASS_NAME = KerberosToken.class.getName();
+
+ private static final int VERSION = 1;
+
+ private String principal;
+
+ /**
+ * Creates a token using the provided principal and the currently logged-in user via {@link UserGroupInformation}.
+ *
+ * @param principal
+ * The user that is logged in
+ */
+ public KerberosToken(String principal) throws IOException {
+ Preconditions.checkNotNull(principal);
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ Preconditions.checkArgument(ugi.hasKerberosCredentials(), "Subject is not logged in via Kerberos");
+ Preconditions.checkArgument(principal.equals(ugi.getUserName()), "Provided principal does not match currently logged-in user");
+ this.principal = ugi.getUserName();
+ }
+
+ /**
+ * Creates a token using the login user as returned by {@link UserGroupInformation#getCurrentUser()}
+ *
+ * @throws IOException
+ * If the current logged in user cannot be computed.
+ */
+ public KerberosToken() throws IOException {
+ this(UserGroupInformation.getCurrentUser().getUserName());
+ }
+
+ @Override
+ public KerberosToken clone() {
+ try {
+ return new KerberosToken(principal);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof KerberosToken))
+ return false;
+ KerberosToken other = (KerberosToken) obj;
+
+ return principal.equals(other.principal);
+ }
+
+ /**
+ * The identity of the user to which this token belongs to according to Kerberos
+ *
+ * @return The principal
+ */
+ public String getPrincipal() {
+ return principal;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(VERSION);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int actualVersion = in.readInt();
+ if (VERSION != actualVersion) {
+ throw new IOException("Did not find expected version in serialized KerberosToken");
+ }
+ }
+
+ @Override
+ public synchronized void destroy() throws DestroyFailedException {
+ principal = null;
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return null == principal;
+ }
+
+ @Override
+ public void init(Properties properties) {
+
+ }
+
+ @Override
+ public Set<TokenProperty> getProperties() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public int hashCode() {
+ return principal.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ce5de85..ad96680 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -105,6 +105,11 @@ public enum Property {
// TLSv1.2 should be used as the default when JDK6 support is dropped
RPC_SSL_CLIENT_PROTOCOL("rpc.ssl.client.protocol", "TLSv1", PropertyType.STRING,
"The protocol used to connect to a secure server, must be in the list of enabled protocols on the server side (rpc.ssl.server.enabled.protocols)"),
+ /**
+ * @since 1.7.0
+ */
+ RPC_SASL_QOP("rpc.sasl.qop", "auth", PropertyType.STRING,
+ "The quality of protection to be used with SASL. Valid values are 'auth', 'auth-int', and 'auth-conf'"),
// instance properties (must be the same for every node in an instance)
INSTANCE_PREFIX("instance.", null, PropertyType.PREFIX,
@@ -145,8 +150,14 @@ public enum Property {
"The authorizor class that accumulo will use to determine what labels a user has privilege to see"),
INSTANCE_SECURITY_PERMISSION_HANDLER("instance.security.permissionHandler", "org.apache.accumulo.server.security.handler.ZKPermHandler",
PropertyType.CLASSNAME, "The permission handler class that accumulo will use to determine if a user has privilege to perform an action"),
- INSTANCE_RPC_SSL_ENABLED("instance.rpc.ssl.enabled", "false", PropertyType.BOOLEAN, "Use SSL for socket connections from clients and among accumulo services"),
+ INSTANCE_RPC_SSL_ENABLED("instance.rpc.ssl.enabled", "false", PropertyType.BOOLEAN,
+ "Use SSL for socket connections from clients and among accumulo services. Mutually exclusive with SASL RPC configuration."),
INSTANCE_RPC_SSL_CLIENT_AUTH("instance.rpc.ssl.clientAuth", "false", PropertyType.BOOLEAN, "Require clients to present certs signed by a trusted root"),
+ /**
+ * @since 1.7.0
+ */
+ INSTANCE_RPC_SASL_ENABLED("instance.rpc.sasl.enabled", "false", PropertyType.BOOLEAN,
+ "Configures Thrift RPCs to require SASL with GSSAPI which supports Kerberos authentication. Mutually exclusive with SSL RPC configuration."),
// general properties
GENERAL_PREFIX("general.", null, PropertyType.PREFIX,
@@ -158,6 +169,9 @@ public enum Property {
GENERAL_DYNAMIC_CLASSPATHS(AccumuloVFSClassLoader.DYNAMIC_CLASSPATH_PROPERTY_NAME, AccumuloVFSClassLoader.DEFAULT_DYNAMIC_CLASSPATH_VALUE,
PropertyType.STRING, "A list of all of the places where changes in jars or classes will force a reload of the classloader."),
GENERAL_RPC_TIMEOUT("general.rpc.timeout", "120s", PropertyType.TIMEDURATION, "Time to wait on I/O for simple, short RPC calls"),
+ @Experimental
+ GENERAL_RPC_SERVER_TYPE("general.rpc.server.type", "", PropertyType.STRING,
+ "Type of Thrift server to instantiate, see org.apache.accumulo.server.rpc.ThriftServerType for more information. Only useful for benchmarking thrift servers"),
GENERAL_KERBEROS_KEYTAB("general.kerberos.keytab", "", PropertyType.PATH, "Path to the kerberos keytab to use. Leave blank if not using kerberoized hdfs"),
GENERAL_KERBEROS_PRINCIPAL("general.kerberos.principal", "", PropertyType.STRING, "Name of the kerberos principal to use. _HOST will automatically be "
+ "replaced by the machines hostname in the hostname portion of the principal. Leave blank if not using kerberoized hdfs"),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java
new file mode 100644
index 0000000..a50944b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.rpc;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Transport that simply wraps another transport. This is the equivalent of FilterInputStream for Thrift transports.
+ */
+public class FilterTransport extends TTransport {
+ private final TTransport wrapped;
+
+ public FilterTransport(TTransport wrapped) {
+ Preconditions.checkNotNull(wrapped);
+ this.wrapped = wrapped;
+ }
+
+ protected TTransport getWrapped() {
+ return wrapped;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ wrapped.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wrapped.isOpen();
+ }
+
+ @Override
+ public boolean peek() {
+ return wrapped.peek();
+ }
+
+ @Override
+ public void close() {
+ wrapped.close();
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.read(buf, off, len);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.readAll(buf, off, len);
+ }
+
+ @Override
+ public void write(byte[] buf) throws TTransportException {
+ wrapped.write(buf);
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ wrapped.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ wrapped.flush();
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return wrapped.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return wrapped.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return wrapped.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ wrapped.consumeBuffer(len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/SaslConnectionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/SaslConnectionParams.java b/core/src/main/java/org/apache/accumulo/core/rpc/SaslConnectionParams.java
new file mode 100644
index 0000000..e067e23
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/SaslConnectionParams.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection parameters for setting up a TSaslTransportFactory
+ */
+public class SaslConnectionParams {
+ private static final Logger log = LoggerFactory.getLogger(SaslConnectionParams.class);
+
+ /**
+ * Enumeration around {@link Sasl#QOP}
+ */
+ public enum QualityOfProtection {
+ AUTH("auth"),
+ AUTH_INT("auth-int"),
+ AUTH_CONF("auth-conf");
+
+ private final String quality;
+
+ private QualityOfProtection(String quality) {
+ this.quality = quality;
+ }
+
+ public String getQuality() {
+ return quality;
+ }
+
+ public static QualityOfProtection get(String name) {
+ if (AUTH.quality.equals(name)) {
+ return AUTH;
+ } else if (AUTH_INT.quality.equals(name)) {
+ return AUTH_INT;
+ } else if (AUTH_CONF.quality.equals(name)) {
+ return AUTH_CONF;
+ }
+
+ throw new IllegalArgumentException("No value for " + name);
+ }
+
+ @Override
+ public String toString() {
+ return quality;
+ }
+ }
+
+ private static String defaultRealm;
+
+ static {
+ try {
+ defaultRealm = KerberosUtil.getDefaultRealm();
+ } catch (Exception ke) {
+ log.debug("Kerberos krb5 configuration not found, setting default realm to empty");
+ defaultRealm = "UNKNOWN";
+ }
+ }
+
+ private String principal;
+ private QualityOfProtection qop;
+ private String kerberosServerPrimary;
+ private final Map<String,String> saslProperties;
+
+ private SaslConnectionParams() {
+ saslProperties = new HashMap<>();
+ }
+
+ /**
+ * Generate an {@link SaslConnectionParams} instance given the provided {@link AccumuloConfiguration}. The provided configuration is converted into a
+ * {@link ClientConfiguration}, ignoring any properties which are not {@link ClientProperty}s. If SASL is not being used, a null object will be returned.
+ * Callers should strive to use {@link #forConfig(ClientConfiguration)}; server processes are the only intended consumers of this method.
+ *
+ * @param conf
+ * The configuration for clients to communicate with Accumulo
+ * @return An {@link SaslConnectionParams} instance or null if SASL is not enabled
+ */
+ public static SaslConnectionParams forConfig(AccumuloConfiguration conf) {
+ final Map<String,String> clientProperties = new HashMap<>();
+
+ // Servers will only have the full principal in their configuration -- parse the
+ // primary and realm from it.
+ final String serverPrincipal = conf.get(Property.GENERAL_KERBEROS_PRINCIPAL);
+
+ final KerberosName krbName;
+ try {
+ krbName = new KerberosName(serverPrincipal);
+ clientProperties.put(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey(), krbName.getServiceName());
+ } catch (Exception e) {
+ // bad value or empty, assume we're not using kerberos
+ }
+
+ HashSet<String> clientKeys = new HashSet<>();
+ for (ClientProperty prop : ClientProperty.values()) {
+ clientKeys.add(prop.getKey());
+ }
+
+ String key;
+ for (Entry<String,String> entry : conf) {
+ key = entry.getKey();
+ if (clientKeys.contains(key)) {
+ clientProperties.put(key, entry.getValue());
+ }
+ }
+
+ ClientConfiguration clientConf = new ClientConfiguration(new MapConfiguration(clientProperties));
+ return forConfig(clientConf);
+ }
+
+ /**
+ * Generate an {@link SaslConnectionParams} instance given the provided {@link ClientConfiguration}. If SASL is not being used, a null object will be
+ * returned.
+ *
+ * @param conf
+ * The configuration for clients to communicate with Accumulo
+ * @return An {@link SaslConnectionParams} instance or null if SASL is not enabled
+ */
+ public static SaslConnectionParams forConfig(ClientConfiguration conf) {
+ if (!Boolean.parseBoolean(conf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED))) {
+ return null;
+ }
+
+ SaslConnectionParams params = new SaslConnectionParams();
+
+ // Ensure we're using Kerberos auth for Hadoop UGI
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ throw new RuntimeException("Cannot use SASL if Hadoop security is not enabled");
+ }
+
+ // Get the current user
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get current user", e);
+ }
+
+ // The full name is our principal
+ params.principal = currentUser.getUserName();
+ if (null == params.principal) {
+ throw new RuntimeException("Got null username from " + currentUser);
+ }
+
+ // Get the quality of protection to use
+ final String qopValue = conf.get(ClientProperty.RPC_SASL_QOP);
+ params.qop = QualityOfProtection.get(qopValue);
+
+ // Add in the SASL properties to a map so we don't have to repeatedly construct this map
+ params.saslProperties.put(Sasl.QOP, params.qop.getQuality());
+
+ // The primary from the KRB principal on each server (e.g. primary/instance@realm)
+ params.kerberosServerPrimary = conf.get(ClientProperty.KERBEROS_SERVER_PRIMARY);
+
+ return params;
+ }
+
+ public Map<String,String> getSaslProperties() {
+ return Collections.unmodifiableMap(saslProperties);
+ }
+ /**
+ * The quality of protection used with SASL. See {@link Sasl#QOP} for more information.
+ */
+ public QualityOfProtection getQualityOfProtection() {
+ return qop;
+ }
+
+ /**
+ * The 'primary' component from the Kerberos principals that servers are configured to use.
+ */
+ public String getKerberosServerPrimary() {
+ return kerberosServerPrimary;
+ }
+
+ /**
+ * The principal of the logged in user for SASL
+ */
+ public String getPrincipal() {
+ return principal;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder hcb = new HashCodeBuilder(23,29);
+ hcb.append(kerberosServerPrimary).append(saslProperties).append(qop.hashCode()).append(principal);
+ return hcb.toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof SaslConnectionParams) {
+ SaslConnectionParams other = (SaslConnectionParams) o;
+ if (!kerberosServerPrimary.equals(other.kerberosServerPrimary)) {
+ return false;
+ }
+ if (qop != other.qop) {
+ return false;
+ }
+ if (!principal.equals(other.principal)) {
+ return false;
+ }
+
+ return saslProperties.equals(other.saslProperties);
+ }
+
+ return false;
+ }
+
+ public static String getDefaultRealm() {
+ return defaultRealm;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 09bd6c4..d880fb3 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.rpc;
import java.io.FileInputStream;
import java.io.IOException;
+import java.net.InetAddress;
import java.security.KeyStore;
import java.util.HashMap;
import java.util.Map;
@@ -37,17 +38,20 @@ import org.apache.accumulo.core.client.impl.ThriftTransportPool;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.log4j.Logger;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.net.HostAndPort;
@@ -55,12 +59,14 @@ import com.google.common.net.HostAndPort;
* Factory methods for creating Thrift client objects
*/
public class ThriftUtil {
- private static final Logger log = Logger.getLogger(ThriftUtil.class);
+ private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);
private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory();
private static final TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
+ public static final String GSSAPI = "GSSAPI";
+
/**
* An instance of {@link TraceProtocolFactory}
*
@@ -246,7 +252,7 @@ public class ThriftUtil {
* RPC options
*/
public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException {
- return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams());
+ return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams(), context.getClientSaslParams());
}
/**
@@ -283,13 +289,23 @@ public class ThriftUtil {
* Client socket timeout
* @param sslParams
* RPC options for SSL servers
+ * @param saslParams
+ * RPC options for SASL servers
* @return An open TTransport which must be closed when finished
*/
- public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams) throws TTransportException {
+ public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams, SaslConnectionParams saslParams)
+ throws TTransportException {
boolean success = false;
TTransport transport = null;
try {
if (sslParams != null) {
+ // The check in AccumuloServerContext ensures that servers are brought up with sane configurations, but we also want to validate clients
+ if (null != saslParams) {
+ throw new IllegalStateException("Cannot use both SSL and SASL");
+ }
+
+ log.trace("Creating SSL client transport");
+
// TSSLTransportFactory handles timeout 0 -> forever natively
if (sslParams.useJsse()) {
transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout);
@@ -309,20 +325,59 @@ public class ThriftUtil {
// Create the TSocket from that
transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout);
+ // TSSLTransportFactory leaves transports open, so no need to open here
}
- // TSSLTransportFactory leaves transports open, so no need to open here
- } else if (timeout == 0) {
+
+ transport = ThriftUtil.transportFactory().getTransport(transport);
+ } else if (null != saslParams) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ throw new IllegalStateException("Expected Kerberos security to be enabled if SASL is in use");
+ }
+
+ log.trace("Creating SASL connection to {}:{}", address.getHostText(), address.getPort());
+
transport = new TSocket(address.getHostText(), address.getPort());
- transport.open();
- } else {
+
try {
- transport = TTimeoutTransport.create(address, timeout);
- } catch (IOException ex) {
- throw new TTransportException(ex);
+ // Log in via UGI, ensures we have logged in with our KRB credentials
+ final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ // Is this pricey enough that we want to cache it?
+ final String hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
+
+ log.trace("Opening transport to server as {} to {}/{}", currentUser, saslParams.getKerberosServerPrimary(), hostname);
+
+ // Create the client SASL transport using the information for the server
+ // Despite the 'protocol' argument seeming to be useless, it *must* be the primary of the server being connected to
+ transport = new TSaslClientTransport(GSSAPI, null, saslParams.getKerberosServerPrimary(), hostname, saslParams.getSaslProperties(), null, transport);
+
+ // Wrap it all in a processor which will run with a doAs the current user
+ transport = new UGIAssumingTransport(transport, currentUser);
+
+ // Open the transport
+ transport.open();
+ } catch (IOException e) {
+ log.warn("Failed to open SASL transport", e);
+ throw new TTransportException(e);
+ }
+ } else {
+ log.trace("Opening normal transport");
+ if (timeout == 0) {
+ transport = new TSocket(address.getHostText(), address.getPort());
+ transport.open();
+ } else {
+ try {
+ transport = TTimeoutTransport.create(address, timeout);
+ } catch (IOException ex) {
+ log.warn("Failed to open transport to " + address);
+ throw new TTransportException(ex);
+ }
+
+ // Open the transport
+ transport.open();
}
- transport.open();
+ transport = ThriftUtil.transportFactory().getTransport(transport);
}
- transport = ThriftUtil.transportFactory().getTransport(transport);
success = true;
} finally {
if (!success && transport != null) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransport.java
new file mode 100644
index 0000000..bc2c785
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransport.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient inside open(). So, we need to assume the correct UGI when the transport is
+ * opened so that the SASL mechanisms have access to the right principal. This transport wraps the Sasl transports to set up the right UGI context for open().
+ *
+ * This is used on the client side, where the API explicitly opens a transport to the server.
+ *
+ * Lifted from Apache Hive 0.14
+ */
+public class UGIAssumingTransport extends FilterTransport {
+ protected UserGroupInformation ugi;
+
+ public UGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+ super(wrapped);
+ this.ugi = ugi;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ final AtomicReference<TTransportException> holder = new AtomicReference<>(null);
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() {
+ try {
+ getWrapped().open();
+ } catch (TTransportException tte) {
+ holder.set(tte);
+ }
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Make sure the transport exception gets (re)thrown if it happened
+ TTransportException tte = holder.get();
+ if (null != tte) {
+ throw tte;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java
new file mode 100644
index 0000000..77a3ce3
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.security.PrivilegedAction;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A TransportFactory that wraps another one, but assumes a specified UGI before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting clients.
+ *
+ * Borrowed from Apache Hive 0.14
+ */
+public class UGIAssumingTransportFactory extends TTransportFactory {
+ private final UserGroupInformation ugi;
+ private final TTransportFactory wrapped;
+
+ public UGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+ Preconditions.checkNotNull(wrapped);
+ Preconditions.checkNotNull(ugi);
+
+ this.wrapped = wrapped;
+ this.ugi = ugi;
+ }
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ return ugi.doAs(new PrivilegedAction<TTransport>() {
+ @Override
+ public TTransport run() {
+ return wrapped.getTransport(trans);
+ }
+ });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
index ff49bc0..435ae85 100644
--- a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
+++ b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
@@ -21,14 +21,22 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import javax.security.auth.DestroyFailedException;
+
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.security.Authorizations;
@@ -48,14 +56,14 @@ public class TestClientOpts {
public TestName testName = new TestName();
@Test
- public void test() {
+ public void test() throws Exception {
BatchWriterConfig cfg = new BatchWriterConfig();
// document the defaults
ClientOpts args = new ClientOpts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
BatchScannerOpts bsOpts = new BatchScannerOpts();
- assertEquals(System.getProperty("user.name"), args.principal);
+ assertNull(args.principal);
assertNull(args.securePassword);
assertNull(args.getToken());
assertEquals(Long.valueOf(cfg.getMaxLatency(TimeUnit.MILLISECONDS)), bwOpts.batchLatency);
@@ -146,4 +154,106 @@ public class TestClientOpts {
args.getInstance();
}
+ @Test
+ public void testSsl() {
+ ClientOpts args = new ClientOpts();
+
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("--ssl");
+ assertEquals(true, args.sslEnabled);
+ }
+
+ @Test
+ public void testSaslWithClientConfig() throws IOException {
+ ClientOpts args = new ClientOpts();
+
+ File clientConfFile = tmpDir.newFile();
+ FileWriter writer = new FileWriter(clientConfFile);
+
+ try {
+ writer.write(String.format("%s=%s\n", ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), "true"));
+ } finally {
+ writer.close();
+ }
+
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("--config-file", clientConfFile.getCanonicalPath());
+ args.updateKerberosCredentials();
+
+ assertEquals(KerberosToken.CLASS_NAME, args.tokenClassName);
+ }
+
+ @Test
+ public void testSasl() {
+ ClientOpts args = new ClientOpts();
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("--sasl");
+ assertEquals(true, args.saslEnabled);
+ }
+
+ @Test
+ public void testEmptyTokenProperties() {
+ ClientOpts args = new ClientOpts();
+
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("-tc", EmptyToken.class.getName());
+ assertEquals(new EmptyToken(), args.getToken());
+ }
+
+ @Test
+ public void testPrincipalWithSasl() throws IOException {
+ ClientOpts args = new ClientOpts();
+
+ File clientConfFile = tmpDir.newFile();
+
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("--config-file", clientConfFile.getCanonicalPath(), "--sasl", "-i", "instance_name");
+
+ ClientConfiguration clientConf = args.getClientConfiguration();
+ assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ }
+
+ /**
+ * An authentication token which requires no options
+ */
+ private static class EmptyToken implements AuthenticationToken {
+ public EmptyToken() {}
+
+ @Override
+ public void write(DataOutput out) throws IOException {}
+
+ @Override
+ public void readFields(DataInput in) throws IOException {}
+
+ @Override
+ public void destroy() throws DestroyFailedException {}
+
+ @Override
+ public boolean isDestroyed() {
+ return false;
+ }
+
+ @Override
+ public void init(Properties properties) {}
+
+ @Override
+ public Set<TokenProperty> getProperties() {
+ return null;
+ }
+
+ @Override
+ public AuthenticationToken clone() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof EmptyToken;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/client/ClientConfigurationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/ClientConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/client/ClientConfigurationTest.java
new file mode 100644
index 0000000..424cea1
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/ClientConfigurationTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.junit.Test;
+
+public class ClientConfigurationTest {
+
+ @Test
+ public void testOverrides() throws Exception {
+ ClientConfiguration clientConfig = createConfig();
+ assertExpectedConfig(clientConfig);
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+ ClientConfiguration clientConfig = createConfig();
+ // sanity check that we're starting with what we're expecting
+ assertExpectedConfig(clientConfig);
+
+ String serialized = clientConfig.serialize();
+ ClientConfiguration deserializedClientConfig = ClientConfiguration.deserialize(serialized);
+ assertExpectedConfig(deserializedClientConfig);
+ }
+
+ private void assertExpectedConfig(ClientConfiguration clientConfig) {
+ assertEquals("firstZkHosts", clientConfig.get(ClientProperty.INSTANCE_ZK_HOST));
+ assertEquals("secondInstanceName", clientConfig.get(ClientProperty.INSTANCE_NAME));
+ assertEquals("123s", clientConfig.get(ClientProperty.INSTANCE_ZK_TIMEOUT));
+ assertEquals(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE.getDefaultValue(), clientConfig.get(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE));
+ }
+
+ private ClientConfiguration createConfig() {
+ Configuration first = new PropertiesConfiguration();
+ first.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "firstZkHosts");
+ Configuration second = new PropertiesConfiguration();
+ second.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "secondZkHosts");
+ second.addProperty(ClientProperty.INSTANCE_NAME.getKey(), "secondInstanceName");
+ Configuration third = new PropertiesConfiguration();
+ third.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "thirdZkHosts");
+ third.addProperty(ClientProperty.INSTANCE_NAME.getKey(), "thirdInstanceName");
+ third.addProperty(ClientProperty.INSTANCE_ZK_TIMEOUT.getKey(), "123s");
+ return new ClientConfiguration(Arrays.asList(first, second, third));
+ }
+
+ @Test
+ public void testSasl() {
+ ClientConfiguration conf = new ClientConfiguration(Collections.<Configuration> emptyList());
+ assertEquals("false", conf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ conf.withSasl(false);
+ assertEquals("false", conf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ conf.withSasl(true);
+ assertEquals("true", conf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ final String primary = "accumulo";
+ conf.withSasl(true, primary);
+ assertEquals(primary, conf.get(ClientProperty.KERBEROS_SERVER_PRIMARY));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
new file mode 100644
index 0000000..2723273
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class ThriftTransportKeyTest {
+
+ @Test(expected = RuntimeException.class)
+ public void testSslAndSaslErrors() {
+ ClientContext clientCtx = createMock(ClientContext.class);
+ SslConnectionParams sslParams = createMock(SslConnectionParams.class);
+ SaslConnectionParams saslParams = createMock(SaslConnectionParams.class);
+
+ expect(clientCtx.getClientSslParams()).andReturn(sslParams).anyTimes();
+ expect(clientCtx.getClientSaslParams()).andReturn(saslParams).anyTimes();
+
+ // We don't care to verify the sslparam or saslparam mocks
+ replay(clientCtx);
+
+ try {
+ new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999), 120 * 1000, clientCtx);
+ } finally {
+ verify(clientCtx);
+ }
+ }
+
+ @Test
+ public void testSaslPrincipalIsSignificant() {
+ SaslConnectionParams saslParams1 = createMock(SaslConnectionParams.class), saslParams2 = createMock(SaslConnectionParams.class);
+ expect(saslParams1.getPrincipal()).andReturn("user1");
+ expect(saslParams2.getPrincipal()).andReturn("user2");
+
+ replay(saslParams1, saslParams2);
+
+ ThriftTransportKey ttk1 = new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1l, null, saslParams1), ttk2 = new ThriftTransportKey(
+ HostAndPort.fromParts("localhost", 9997), 1l, null, saslParams2);
+
+ assertNotEquals(ttk1, ttk2);
+ assertNotEquals(ttk1.hashCode(), ttk2.hashCode());
+
+ verify(saslParams1, saslParams2);
+ }
+
+ @Test
+ public void testSimpleEquivalence() {
+ ClientContext clientCtx = createMock(ClientContext.class);
+
+ expect(clientCtx.getClientSslParams()).andReturn(null).anyTimes();
+ expect(clientCtx.getClientSaslParams()).andReturn(null).anyTimes();
+
+ replay(clientCtx);
+
+ ThriftTransportKey ttk = new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999), 120 * 1000, clientCtx);
+
+ assertTrue("Normal ThriftTransportKey doesn't equal itself", ttk.equals(ttk));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/conf/ClientConfigurationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/ClientConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/ClientConfigurationTest.java
deleted file mode 100644
index 40be70f..0000000
--- a/core/src/test/java/org/apache/accumulo/core/conf/ClientConfigurationTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.conf;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.junit.Test;
-
-public class ClientConfigurationTest {
- @Test
- public void testOverrides() throws Exception {
- ClientConfiguration clientConfig = createConfig();
- assertExpectedConfig(clientConfig);
- }
-
- @Test
- public void testSerialization() throws Exception {
- ClientConfiguration clientConfig = createConfig();
- // sanity check that we're starting with what we're expecting
- assertExpectedConfig(clientConfig);
-
- String serialized = clientConfig.serialize();
- ClientConfiguration deserializedClientConfig = ClientConfiguration.deserialize(serialized);
- assertExpectedConfig(deserializedClientConfig);
- }
-
- private void assertExpectedConfig(ClientConfiguration clientConfig) {
- assertEquals("firstZkHosts", clientConfig.get(ClientProperty.INSTANCE_ZK_HOST));
- assertEquals("secondInstanceName", clientConfig.get(ClientProperty.INSTANCE_NAME));
- assertEquals("123s", clientConfig.get(ClientProperty.INSTANCE_ZK_TIMEOUT));
- assertEquals(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE.getDefaultValue(), clientConfig.get(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE));
- }
-
- private ClientConfiguration createConfig() {
- Configuration first = new PropertiesConfiguration();
- first.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "firstZkHosts");
- Configuration second = new PropertiesConfiguration();
- second.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "secondZkHosts");
- second.addProperty(ClientProperty.INSTANCE_NAME.getKey(), "secondInstanceName");
- Configuration third = new PropertiesConfiguration();
- third.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "thirdZkHosts");
- third.addProperty(ClientProperty.INSTANCE_NAME.getKey(), "thirdInstanceName");
- third.addProperty(ClientProperty.INSTANCE_ZK_TIMEOUT.getKey(), "123s");
- return new ClientConfiguration(Arrays.asList(first, second, third));
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
new file mode 100644
index 0000000..8c65776
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Map;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams.QualityOfProtection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SaslConnectionParamsTest {
+
+ private String user;
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ user = UserGroupInformation.getCurrentUser().getUserName();
+ }
+
+ @Test
+ public void testNullParams() {
+ ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+ AccumuloConfiguration rpcConf = ClientContext.convertClientConfig(clientConf);
+ assertEquals("false", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ assertNull(SaslConnectionParams.forConfig(rpcConf));
+ }
+
+ @Test
+ public void testDefaultParamsAsClient() throws Exception {
+ final ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+
+ // The primary is the first component of the principal
+ final String primary = "accumulo";
+ clientConf.withSasl(true, primary);
+
+ assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+ final SaslConnectionParams saslParams = SaslConnectionParams.forConfig(clientConf);
+ assertEquals(primary, saslParams.getKerberosServerPrimary());
+
+ final QualityOfProtection defaultQop = QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue());
+ assertEquals(defaultQop, saslParams.getQualityOfProtection());
+
+ Map<String,String> properties = saslParams.getSaslProperties();
+ assertEquals(1, properties.size());
+ assertEquals(defaultQop.getQuality(), properties.get(Sasl.QOP));
+ assertEquals(user, saslParams.getPrincipal());
+ }
+
+ @Test
+ public void testDefaultParamsAsServer() throws Exception {
+ final ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+
+ // The primary is the first component of the principal
+ final String primary = "accumulo";
+ clientConf.withSasl(true, primary);
+
+ final AccumuloConfiguration rpcConf = ClientContext.convertClientConfig(clientConf);
+ assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+ final SaslConnectionParams saslParams = SaslConnectionParams.forConfig(rpcConf);
+ assertEquals(primary, saslParams.getKerberosServerPrimary());
+
+ final QualityOfProtection defaultQop = QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue());
+ assertEquals(defaultQop, saslParams.getQualityOfProtection());
+
+ Map<String,String> properties = saslParams.getSaslProperties();
+ assertEquals(1, properties.size());
+ assertEquals(defaultQop.getQuality(), properties.get(Sasl.QOP));
+ assertEquals(user, saslParams.getPrincipal());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
index 5884da2..b9a85e2 100644
--- a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
+++ b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
@@ -55,6 +55,8 @@ include::chapters/implementation.txt[]
include::chapters/ssl.txt[]
+include::chapters/kerberos.txt[]
+
include::chapters/administration.txt[]
include::chapters/multivolume.txt[]
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/docs/src/main/asciidoc/chapters/clients.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/clients.txt b/docs/src/main/asciidoc/chapters/clients.txt
index 64f0e55..3f85074 100644
--- a/docs/src/main/asciidoc/chapters/clients.txt
+++ b/docs/src/main/asciidoc/chapters/clients.txt
@@ -67,6 +67,17 @@ KeyStore to alleviate passwords stored in cleartext. When stored in HDFS, a sing
KeyStore can be used across an entire instance. Be aware that KeyStores stored on
the local filesystem must be made available to all nodes in the Accumulo cluster.
+[source,java]
+----
+KerberosToken token = new KerberosToken();
+Connector conn = inst.getConnector(token.getPrincipal(), token);
+----
+
+The KerberosToken can be provided to use the authentication provided by Kerberos.
+Using Kerberos requires external setup and additional configuration, but provides
+a single point of authentication through HDFS, YARN and ZooKeeper and allowing
+for password-less authentication with Accumulo.
+
=== Writing Data
Data are written to Accumulo by creating Mutation objects that represent all the