You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/01/15 18:51:37 UTC
[1/4] accumulo git commit: ACCUMULO-2815 Support for Kerberos client
authentication.
Repository: accumulo
Updated Branches:
refs/heads/master 8dc68b97a -> 4f19aa1f8
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/harness/MiniClusterHarness.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/MiniClusterHarness.java b/test/src/test/java/org/apache/accumulo/harness/MiniClusterHarness.java
index abdb627..06b4303 100644
--- a/test/src/test/java/org/apache/accumulo/harness/MiniClusterHarness.java
+++ b/test/src/test/java/org/apache/accumulo/harness/MiniClusterHarness.java
@@ -21,16 +21,24 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.security.handler.KerberosAuthenticator;
+import org.apache.accumulo.server.security.handler.KerberosAuthorizor;
+import org.apache.accumulo.server.security.handler.KerberosPermissionHandler;
import org.apache.accumulo.test.functional.NativeMapIT;
import org.apache.accumulo.test.util.CertUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
@@ -39,11 +47,16 @@ import com.google.common.base.Preconditions;
* Harness that sets up a MiniAccumuloCluster in a manner expected for Accumulo integration tests.
*/
public class MiniClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(MiniClusterHarness.class);
private static final AtomicLong COUNTER = new AtomicLong(0);
public static final String USE_SSL_FOR_IT_OPTION = "org.apache.accumulo.test.functional.useSslForIT",
- USE_CRED_PROVIDER_FOR_IT_OPTION = "org.apache.accumulo.test.functional.useCredProviderForIT", TRUE = Boolean.toString(true);
+ USE_CRED_PROVIDER_FOR_IT_OPTION = "org.apache.accumulo.test.functional.useCredProviderForIT",
+ USE_KERBEROS_FOR_IT_OPTION = "org.apache.accumulo.test.functional.useKrbForIT", TRUE = Boolean.toString(true);
+
+ // TODO These are defined in MiniKdc >= 2.6.0
+ public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf", SUN_SECURITY_KRB5_DEBUG = "sun.security.krb5.debug";
/**
* Create a MiniAccumuloCluster using the given Token as the credentials for the root user.
@@ -56,35 +69,54 @@ public class MiniClusterHarness {
return create(testBase.getClass().getName(), testBase.testName.getMethodName(), token);
}
- public MiniAccumuloClusterImpl create(AccumuloClusterIT testBase, AuthenticationToken token) throws Exception {
- return create(testBase.getClass().getName(), testBase.testName.getMethodName(), token, testBase);
+ public MiniAccumuloClusterImpl create(AccumuloIT testBase, AuthenticationToken token, TestingKdc kdc) throws Exception {
+ return create(testBase.getClass().getName(), testBase.testName.getMethodName(), token, kdc);
+ }
+
+ public MiniAccumuloClusterImpl create(AccumuloClusterIT testBase, AuthenticationToken token, TestingKdc kdc) throws Exception {
+ return create(testBase.getClass().getName(), testBase.testName.getMethodName(), token, testBase, kdc);
}
public MiniAccumuloClusterImpl create(AccumuloClusterIT testBase, AuthenticationToken token, MiniClusterConfigurationCallback callback) throws Exception {
- return create(testBase.getClass().getName(), testBase.testName.getMethodName(), token, testBase);
+ return create(testBase.getClass().getName(), testBase.testName.getMethodName(), token, callback);
}
public MiniAccumuloClusterImpl create(String testClassName, String testMethodName, AuthenticationToken token) throws Exception {
return create(testClassName, testMethodName, token, MiniClusterConfigurationCallback.NO_CALLBACK);
}
+ public MiniAccumuloClusterImpl create(String testClassName, String testMethodName, AuthenticationToken token, TestingKdc kdc) throws Exception {
+ return create(testClassName, testMethodName, token, MiniClusterConfigurationCallback.NO_CALLBACK, kdc);
+ }
+
public MiniAccumuloClusterImpl create(String testClassName, String testMethodName, AuthenticationToken token, MiniClusterConfigurationCallback configCallback)
throws Exception {
+ return create(testClassName, testMethodName, token, configCallback, null);
+ }
+
+ public MiniAccumuloClusterImpl create(String testClassName, String testMethodName, AuthenticationToken token,
+ MiniClusterConfigurationCallback configCallback, TestingKdc kdc) throws Exception {
Preconditions.checkNotNull(token);
- Preconditions.checkArgument(PasswordToken.class.isAssignableFrom(token.getClass()));
+ Preconditions.checkArgument(token instanceof PasswordToken || token instanceof KerberosToken, "A PasswordToken or KerberosToken is required");
- String passwd = new String(((PasswordToken) token).getPassword(), Charsets.UTF_8);
- MiniAccumuloConfigImpl cfg = new MiniAccumuloConfigImpl(AccumuloClusterIT.createTestDir(testClassName + "_" + testMethodName), passwd);
+ String rootPasswd;
+ if (token instanceof PasswordToken) {
+ rootPasswd = new String(((PasswordToken) token).getPassword(), Charsets.UTF_8);
+ } else {
+ rootPasswd = UUID.randomUUID().toString();
+ }
+
+ MiniAccumuloConfigImpl cfg = new MiniAccumuloConfigImpl(AccumuloClusterIT.createTestDir(testClassName + "_" + testMethodName), rootPasswd);
// Enable native maps by default
cfg.setNativeLibPaths(NativeMapIT.nativeMapLocation().getAbsolutePath());
cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString());
- // Setup SSL and credential providers if the properties request such
- configureForEnvironment(cfg, getClass(), AccumuloClusterIT.createSharedTestDir(this.getClass().getName() + "-ssl"));
-
Configuration coreSite = new Configuration(false);
+ // Setup SSL and credential providers if the properties request such
+ configureForEnvironment(cfg, getClass(), AccumuloClusterIT.createSharedTestDir(this.getClass().getName() + "-ssl"), coreSite, kdc);
+
// Invoke the callback for tests to configure MAC before it starts
configCallback.configureMiniCluster(cfg, coreSite);
@@ -104,13 +136,25 @@ public class MiniClusterHarness {
return miniCluster;
}
- protected void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> testClass, File folder) {
+ protected void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> testClass, File folder, Configuration coreSite, TestingKdc kdc) {
if (TRUE.equals(System.getProperty(USE_SSL_FOR_IT_OPTION))) {
configureForSsl(cfg, folder);
}
if (TRUE.equals(System.getProperty(USE_CRED_PROVIDER_FOR_IT_OPTION))) {
cfg.setUseCredentialProvider(true);
}
+
+ if (TRUE.equals(System.getProperty(USE_KERBEROS_FOR_IT_OPTION))) {
+ if (TRUE.equals(System.getProperty(USE_SSL_FOR_IT_OPTION))) {
+ throw new RuntimeException("Cannot use both SSL and Kerberos");
+ }
+
+ try {
+ configureForKerberos(cfg, folder, coreSite, kdc);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to initialize KDC", e);
+ }
+ }
}
protected void configureForSsl(MiniAccumuloConfigImpl cfg, File folder) {
@@ -141,4 +185,44 @@ public class MiniClusterHarness {
siteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword);
cfg.setSiteConfig(siteConfig);
}
+
+ protected void configureForKerberos(MiniAccumuloConfigImpl cfg, File folder, Configuration coreSite, TestingKdc kdc) throws Exception {
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ if (TRUE.equals(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
+ throw new RuntimeException("Cannot use both SSL and SASL/Kerberos");
+ }
+
+ if (TRUE.equals(siteConfig.get(Property.INSTANCE_RPC_SASL_ENABLED.getKey()))) {
+ // already enabled
+ return;
+ }
+
+ if (null == kdc) {
+ throw new IllegalStateException("MiniClusterKdc was null");
+ }
+
+ log.info("Enabling Kerberos/SASL for minicluster");
+
+ // Turn on SASL and set the keytab/principal information
+ cfg.setProperty(Property.INSTANCE_RPC_SASL_ENABLED, "true");
+ cfg.setProperty(Property.GENERAL_KERBEROS_KEYTAB, kdc.getAccumuloKeytab().getAbsolutePath());
+ cfg.setProperty(Property.GENERAL_KERBEROS_PRINCIPAL, kdc.getAccumuloPrincipal());
+ cfg.setProperty(Property.INSTANCE_SECURITY_AUTHENTICATOR, KerberosAuthenticator.class.getName());
+ cfg.setProperty(Property.INSTANCE_SECURITY_AUTHORIZOR, KerberosAuthorizor.class.getName());
+ cfg.setProperty(Property.INSTANCE_SECURITY_PERMISSION_HANDLER, KerberosPermissionHandler.class.getName());
+ // Piggy-back on the "system user" credential, but use it as a normal KerberosToken, not the SystemToken.
+ cfg.setProperty(Property.TRACE_USER, kdc.getAccumuloPrincipal());
+ cfg.setProperty(Property.TRACE_TOKEN_TYPE, KerberosToken.CLASS_NAME);
+
+ // Pass down some KRB5 debug properties
+ Map<String,String> systemProperties = cfg.getSystemProperties();
+ systemProperties.put(JAVA_SECURITY_KRB5_CONF, System.getProperty(JAVA_SECURITY_KRB5_CONF, ""));
+ systemProperties.put(SUN_SECURITY_KRB5_DEBUG, System.getProperty(SUN_SECURITY_KRB5_DEBUG, "false"));
+ cfg.setSystemProperties(systemProperties);
+
+ // Make sure UserGroupInformation will do the correct login
+ coreSite.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+
+ cfg.setRootUserName(kdc.getClientPrincipal());
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java b/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java
index 2380f66..c844388 100644
--- a/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java
+++ b/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java
@@ -21,10 +21,16 @@ import java.util.Random;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Convenience class which starts a single MAC instance for a test to leverage.
@@ -34,10 +40,14 @@ import org.junit.BeforeClass;
* can't expose any information to tell the base class that it is to perform the one-MAC-per-class semantics.
*/
public abstract class SharedMiniClusterIT extends AccumuloIT {
+ private static final Logger log = LoggerFactory.getLogger(SharedMiniClusterIT.class);
+ private static final String TRUE = Boolean.toString(true);
+ private static String principal = "root";
private static String rootPassword;
private static AuthenticationToken token;
private static MiniAccumuloClusterImpl cluster;
+ private static TestingKdc krb;
@BeforeClass
public static void startMiniCluster() throws Exception {
@@ -47,17 +57,42 @@ public abstract class SharedMiniClusterIT extends AccumuloIT {
// Make a shared MAC instance instead of spinning up one per test method
MiniClusterHarness harness = new MiniClusterHarness();
- rootPassword = "rootPasswordShared1";
- token = new PasswordToken(rootPassword);
+ if (TRUE.equals(System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION))) {
+ krb = new TestingKdc();
+ krb.start();
+ // Enabled krb auth
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ // Login as the client
+ UserGroupInformation.loginUserFromKeytab(krb.getClientPrincipal(), krb.getClientKeytab().getAbsolutePath());
+ // Get the krb token
+ principal = krb.getClientPrincipal();
+ token = new KerberosToken(principal);
+ } else {
+ rootPassword = "rootPasswordShared1";
+ token = new PasswordToken(rootPassword);
+ }
- cluster = harness.create(SharedMiniClusterIT.class.getName(), System.currentTimeMillis() + "_" + new Random().nextInt(Short.MAX_VALUE), token);
+ cluster = harness.create(SharedMiniClusterIT.class.getName(), System.currentTimeMillis() + "_" + new Random().nextInt(Short.MAX_VALUE), token, krb);
cluster.start();
}
@AfterClass
public static void stopMiniCluster() throws Exception {
if (null != cluster) {
- cluster.stop();
+ try {
+ cluster.stop();
+ } catch (Exception e) {
+ log.error("Failed to stop minicluster", e);
+ }
+ }
+ if (null != krb) {
+ try {
+ krb.stop();
+ } catch (Exception e) {
+ log.error("Failed to stop KDC", e);
+ }
}
}
@@ -79,7 +114,7 @@ public abstract class SharedMiniClusterIT extends AccumuloIT {
public static Connector getConnector() {
try {
- return getCluster().getConnector("root", getToken());
+ return getCluster().getConnector(principal, getToken());
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/harness/TestingKdc.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/TestingKdc.java b/test/src/test/java/org/apache/accumulo/harness/TestingKdc.java
new file mode 100644
index 0000000..2abdc62
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/harness/TestingKdc.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.harness;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.util.Properties;
+
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Creates a {@link MiniKdc} for tests to use to exercise secure Accumulo
+ */
+public class TestingKdc {
+ private static final Logger log = LoggerFactory.getLogger(TestingKdc.class);
+
+ protected MiniKdc kdc = null;
+ protected File accumuloKeytab = null, clientKeytab = null;
+ protected String accumuloPrincipal = null, clientPrincipal = null;
+
+ public final String ORG_NAME = "EXAMPLE", ORG_DOMAIN = "COM";
+
+ private String hostname;
+ private File keytabDir;
+ private boolean started = false;
+
+ public TestingKdc() throws Exception {
+ File targetDir = new File(System.getProperty("user.dir"), "target");
+ Assert.assertTrue("Could not find Maven target directory: " + targetDir, targetDir.exists() && targetDir.isDirectory());
+
+ // Create the directories: target/kerberos/{keytabs,minikdc}
+ File krbDir = new File(targetDir, "kerberos"), kdcDir = new File(krbDir, "minikdc");
+ keytabDir = new File(krbDir, "keytabs");
+
+ keytabDir.mkdirs();
+ kdcDir.mkdirs();
+
+ hostname = InetAddress.getLocalHost().getCanonicalHostName();
+
+ Properties kdcConf = MiniKdc.createConf();
+ kdcConf.setProperty(MiniKdc.ORG_NAME, ORG_NAME);
+ kdcConf.setProperty(MiniKdc.ORG_DOMAIN, ORG_DOMAIN);
+ kdc = new MiniKdc(kdcConf, kdcDir);
+ }
+
+ /**
+ * Starts the KDC and creates the principals and their keytabs
+ */
+ public synchronized void start() throws Exception {
+ Preconditions.checkArgument(!started, "KDC was already started");
+ kdc.start();
+
+ accumuloKeytab = new File(keytabDir, "accumulo.keytab");
+ clientKeytab = new File(keytabDir, "client.keytab");
+
+ accumuloPrincipal = String.format("accumulo/%s", hostname);
+ clientPrincipal = "client";
+
+ log.info("Creating Kerberos principal {} with keytab {}", accumuloPrincipal, accumuloKeytab);
+ kdc.createPrincipal(accumuloKeytab, accumuloPrincipal);
+ log.info("Creating Kerberos principal {} with keytab {}", clientPrincipal, clientKeytab);
+ kdc.createPrincipal(clientKeytab, clientPrincipal);
+
+ accumuloPrincipal = qualifyUser(accumuloPrincipal);
+ clientPrincipal = qualifyUser(clientPrincipal);
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception {
+ Preconditions.checkArgument(started, "KDC is not started");
+ kdc.stop();
+ started = false;
+ }
+
+ /**
+ * A directory where the automatically-created keytab files are written
+ */
+ public File getKeytabDir() {
+ return keytabDir;
+ }
+
+ /**
+ * A Kerberos keytab for the Accumulo server processes
+ */
+ public File getAccumuloKeytab() {
+ Preconditions.checkArgument(started, "Accumulo keytab is not initialized, is the KDC started?");
+ return accumuloKeytab;
+ }
+
+ /**
+ * The corresponding principal for the Accumulo service keytab
+ */
+ public String getAccumuloPrincipal() {
+ Preconditions.checkArgument(started, "Accumulo principal is not initialized, is the KDC started?");
+ return accumuloPrincipal;
+ }
+
+ /**
+ * A Kerberos keytab for client use
+ */
+ public File getClientKeytab() {
+ Preconditions.checkArgument(started, "Client keytab is not initialized, is the KDC started?");
+ return clientKeytab;
+ }
+
+ /**
+ * The corresponding principal for the client keytab
+ */
+ public String getClientPrincipal() {
+ Preconditions.checkArgument(started, "Client principal is not initialized, is the KDC started?");
+ return clientPrincipal;
+ }
+
+ /**
+ * @see MiniKdc#createPrincipal(File, String...)
+ */
+ public void createPrincipal(File keytabFile, String... principals) throws Exception {
+ Preconditions.checkArgument(started, "KDC is not started");
+ kdc.createPrincipal(keytabFile, principals);
+ }
+
+ /**
+ * @return the name for the realm
+ */
+ public String getOrgName() {
+ return ORG_NAME;
+ }
+
+ /**
+ * @return the domain for the realm
+ */
+ public String getOrgDomain() {
+ return ORG_DOMAIN;
+ }
+
+ /**
+ * Qualify a username (only the primary from the kerberos principal) with the proper realm
+ *
+ * @param primary
+ * The primary or primary and instance
+ */
+ public String qualifyUser(String primary) {
+ return String.format("%s@%s.%s", primary, getOrgName(), getOrgDomain());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java b/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java
index 11b7530..0efba9e 100644
--- a/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java
+++ b/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java
@@ -16,23 +16,37 @@
*/
package org.apache.accumulo.harness.conf;
+import java.io.File;
+import java.io.IOException;
import java.util.Map;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.accumulo.harness.AccumuloClusterIT.ClusterType;
+import org.apache.accumulo.harness.MiniClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Extract configuration properties for a MiniAccumuloCluster from Java properties
*/
public class AccumuloMiniClusterConfiguration extends AccumuloClusterPropertyConfiguration {
+ private static final Logger log = LoggerFactory.getLogger(AccumuloMiniClusterConfiguration.class);
+ private static final String TRUE = Boolean.toString(true);
public static final String ACCUMULO_MINI_PRINCIPAL_KEY = ACCUMULO_MINI_PREFIX + "principal";
public static final String ACCUMULO_MINI_PRINCIPAL_DEFAULT = "root";
public static final String ACCUMULO_MINI_PASSWORD_KEY = ACCUMULO_MINI_PREFIX + "password";
public static final String ACCUMULO_MINI_PASSWORD_DEFAULT = "rootPassword1";
- private Map<String,String> conf;
+ private final Map<String,String> conf;
+ private final boolean saslEnabled;
public AccumuloMiniClusterConfiguration() {
ClusterType type = getClusterType();
@@ -41,26 +55,52 @@ public class AccumuloMiniClusterConfiguration extends AccumuloClusterPropertyCon
}
this.conf = getConfiguration(type);
+ this.saslEnabled = TRUE.equals(System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION));
+ log.debug("SASL is {}enabled", (saslEnabled ? "" : "not "));
}
@Override
public String getPrincipal() {
- String principal = conf.get(ACCUMULO_MINI_PRINCIPAL_KEY);
- if (null == principal) {
- principal = ACCUMULO_MINI_PRINCIPAL_DEFAULT;
- }
+ if (saslEnabled) {
+ try {
+ return new KerberosName(AccumuloClusterIT.getClientPrincipal()).getShortName();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not parse client principal", e);
+ }
+ } else {
+ String principal = conf.get(ACCUMULO_MINI_PRINCIPAL_KEY);
+ if (null == principal) {
+ principal = ACCUMULO_MINI_PRINCIPAL_DEFAULT;
+ }
- return principal;
+ return principal;
+ }
}
@Override
public AuthenticationToken getToken() {
- String password = conf.get(ACCUMULO_MINI_PASSWORD_KEY);
- if (null == password) {
- password = ACCUMULO_MINI_PASSWORD_DEFAULT;
- }
+ if (saslEnabled) {
+ // Turn on Kerberos authentication so UGI acts properly
+ final Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
- return new PasswordToken(password);
+ File clientKeytab = AccumuloClusterIT.getClientKeytab();
+ String clientPrincipal = AccumuloClusterIT.getClientPrincipal();
+ try {
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ return new KerberosToken(clientPrincipal);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ String password = conf.get(ACCUMULO_MINI_PASSWORD_KEY);
+ if (null == password) {
+ password = ACCUMULO_MINI_PASSWORD_DEFAULT;
+ }
+
+ return new PasswordToken(password);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java b/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java
index abbe5e6..3889110 100644
--- a/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java
+++ b/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.SecurityErrorCode;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.RootTable;
@@ -41,7 +42,7 @@ import org.junit.Test;
public class SystemCredentialsIT extends ConfigurableMacIT {
- private static final int FAIL_CODE = 7;
+ private static final int FAIL_CODE = 7, BAD_PASSWD_FAIL_CODE = 8;
@Override
protected int defaultTimeoutSeconds() {
@@ -52,6 +53,7 @@ public class SystemCredentialsIT extends ConfigurableMacIT {
public void testSystemCredentials() throws Exception {
assertEquals(0, exec(SystemCredentialsIT.class, "good", getCluster().getZooKeepers()).waitFor());
assertEquals(FAIL_CODE, exec(SystemCredentialsIT.class, "bad", getCluster().getZooKeepers()).waitFor());
+ assertEquals(BAD_PASSWD_FAIL_CODE, exec(SystemCredentialsIT.class, "bad_password", getCluster().getZooKeepers()).waitFor());
}
public static void main(final String[] args) throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
@@ -59,7 +61,7 @@ public class SystemCredentialsIT extends ConfigurableMacIT {
if (args.length < 2)
throw new RuntimeException("Incorrect usage; expected to be run by test only");
if (args[0].equals("bad")) {
- creds = new SystemCredentials(new Instance() {
+ Instance inst = new Instance() {
@Override
public int getZooKeepersSessionTimeOut() {
@@ -114,12 +116,78 @@ public class SystemCredentialsIT extends ConfigurableMacIT {
throw new UnsupportedOperationException();
}
- });
+ };
+ creds = SystemCredentials.get(inst);
} else if (args[0].equals("good")) {
creds = SystemCredentials.get(HdfsZooInstance.getInstance());
+ } else if (args[0].equals("bad_password")) {
+ Instance inst = new Instance() {
+
+ @Override
+ public int getZooKeepersSessionTimeOut() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getZooKeepers() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getRootTabletLocation() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> getMasterLocations() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getInstanceName() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getInstanceID() {
+ return SystemCredentials.class.getName();
+ }
+
+ @Override
+ public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ creds = new SystemCredentials(inst, "!SYSTEM", new PasswordToken("fake"));
}
Instance instance = HdfsZooInstance.getInstance();
- Connector conn = instance.getConnector(creds.getPrincipal(), creds.getToken());
+ Connector conn;
+ try {
+ conn = instance.getConnector(creds.getPrincipal(), creds.getToken());
+ } catch (AccumuloSecurityException e) {
+ e.printStackTrace(System.err);
+ System.exit(BAD_PASSWD_FAIL_CODE);
+ return;
+ }
try {
Scanner scan = conn.createScanner(RootTable.NAME, Authorizations.EMPTY);
for (Entry<Key,Value> e : scan) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java b/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java
index aa5c164..4481934 100644
--- a/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java
@@ -21,12 +21,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.harness.SharedMiniClusterIT;
import org.apache.log4j.Logger;
import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
import org.junit.Test;
public class ArbitraryTablePropertiesIT extends SharedMiniClusterIT {
@@ -37,6 +40,11 @@ public class ArbitraryTablePropertiesIT extends SharedMiniClusterIT {
return 30;
};
+ @Before
+ public void checkNoKerberos() {
+ Assume.assumeFalse(getToken() instanceof KerberosToken);
+ }
+
// Test set, get, and remove arbitrary table properties on the root account
@Test
public void setGetRemoveTablePropertyRoot() throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
index 1fcd5a4..bdfbd13 100644
--- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
@@ -70,10 +70,12 @@ public class CleanWalIT extends AccumuloClusterIT {
@After
public void onlineTraceTable() throws Exception {
- Connector conn = getConnector();
- String traceTable = conn.instanceOperations().getSystemConfiguration().get(Property.TRACE_TABLE.getKey());
- if (conn.tableOperations().exists(traceTable)) {
- conn.tableOperations().online(traceTable, true);
+ if (null != cluster) {
+ Connector conn = getConnector();
+ String traceTable = conn.instanceOperations().getSystemConfiguration().get(Property.TRACE_TABLE.getKey());
+ if (conn.tableOperations().exists(traceTable)) {
+ conn.tableOperations().online(traceTable, true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
index 221889b..30d6958 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
@@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Random;
@@ -48,7 +47,7 @@ public class BatchScanSplitIT extends AccumuloClusterIT {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_MAJC_DELAY.getKey(), "0"));
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/test/functional/KerberosIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/KerberosIT.java b/test/src/test/java/org/apache/accumulo/test/functional/KerberosIT.java
new file mode 100644
index 0000000..e3da6eb
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/KerberosIT.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.harness.AccumuloIT;
+import org.apache.accumulo.harness.MiniClusterHarness;
+import org.apache.accumulo.harness.TestingKdc;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+/**
+ * MAC test which uses {@link MiniKdc} to simulate ta secure environment. Can be used as a sanity check for Kerberos/SASL testing.
+ */
+public class KerberosIT extends AccumuloIT {
+ private static final Logger log = LoggerFactory.getLogger(KerberosIT.class);
+
+ private static TestingKdc kdc;
+ private static String krbEnabledForITs = null;
+
+ @BeforeClass
+ public static void startKdc() throws Exception {
+ kdc = new TestingKdc();
+ kdc.start();
+ krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION);
+ if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) {
+ System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true");
+ }
+ }
+
+ @AfterClass
+ public static void stopKdc() throws Exception {
+ if (null != kdc) {
+ kdc.stop();
+ }
+ if (null != krbEnabledForITs) {
+ System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs);
+ }
+ }
+
+ private MiniAccumuloClusterImpl mac;
+
+ @Before
+ public void startMac() throws Exception {
+ MiniClusterHarness harness = new MiniClusterHarness();
+ mac = harness.create(this, new PasswordToken("unused"), kdc);
+ mac.getConfig().setNumTservers(1);
+ mac.start();
+ // Enabled kerberos auth
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ @After
+ public void stopMac() throws Exception {
+ if (null != mac) {
+ mac.stop();
+ }
+ }
+
+ @Test
+ public void testAdminUser() throws Exception {
+ // Login as the client (provided to `accumulo init` as the "root" user)
+ UserGroupInformation.loginUserFromKeytab(kdc.getClientPrincipal(), kdc.getClientKeytab().getAbsolutePath());
+
+ final Connector conn = mac.getConnector(kdc.getClientPrincipal(), new KerberosToken());
+
+ // The "root" user should have all system permissions
+ for (SystemPermission perm : SystemPermission.values()) {
+ assertTrue("Expected user to have permission: " + perm, conn.securityOperations().hasSystemPermission(conn.whoami(), perm));
+ }
+
+ // and the ability to modify the root and metadata tables
+ for (String table : Arrays.asList(RootTable.NAME, MetadataTable.NAME)){
+ assertTrue(conn.securityOperations().hasTablePermission(conn.whoami(), table, TablePermission.ALTER_TABLE));
+ }
+ }
+
+ @Test
+ public void testNewUser() throws Exception {
+ String newUser = testName.getMethodName();
+ final File newUserKeytab = new File(kdc.getKeytabDir(), newUser + ".keytab");
+ if (newUserKeytab.exists()) {
+ newUserKeytab.delete();
+ }
+
+ // Create a new user
+ kdc.createPrincipal(newUserKeytab, newUser);
+
+ newUser = kdc.qualifyUser(newUser);
+
+ // Login as the "root" user
+ UserGroupInformation.loginUserFromKeytab(kdc.getClientPrincipal(), kdc.getClientKeytab().getAbsolutePath());
+ log.info("Logged in as {}", kdc.getClientPrincipal());
+
+ Connector conn = mac.getConnector(kdc.getClientPrincipal(), new KerberosToken());
+ log.info("Created connector as {}", kdc.getClientPrincipal());
+ assertEquals(kdc.getClientPrincipal(), conn.whoami());
+
+ // Make sure the system user doesn't exist -- this will force some RPC to happen server-side
+ createTableWithDataAndCompact(conn);
+
+ HashSet<String> users = Sets.newHashSet(kdc.getClientPrincipal());
+ assertEquals(users, conn.securityOperations().listLocalUsers());
+
+ // Switch to a new user
+ UserGroupInformation.loginUserFromKeytab(newUser, newUserKeytab.getAbsolutePath());
+ log.info("Logged in as {}", newUser);
+
+ conn = mac.getConnector(newUser, new KerberosToken());
+ log.info("Created connector as {}", newUser);
+ assertEquals(newUser, conn.whoami());
+
+ // The new user should have no system permissions
+ for (SystemPermission perm : SystemPermission.values()) {
+ assertFalse(conn.securityOperations().hasSystemPermission(newUser, perm));
+ }
+
+ users.add(newUser);
+
+ // Same users as before, plus the new user we just created
+ assertEquals(users, conn.securityOperations().listLocalUsers());
+ }
+
+ @Test
+ public void testUserPrivilegesThroughGrant() throws Exception {
+ String user1 = testName.getMethodName();
+ final File user1Keytab = new File(kdc.getKeytabDir(), user1 + ".keytab");
+ if (user1Keytab.exists()) {
+ user1Keytab.delete();
+ }
+
+ // Create some new users
+ kdc.createPrincipal(user1Keytab, user1);
+
+ user1 = kdc.qualifyUser(user1);
+
+ // Log in as user1
+ UserGroupInformation.loginUserFromKeytab(user1, user1Keytab.getAbsolutePath());
+ log.info("Logged in as {}", user1);
+
+ // Indirectly creates this user when we use it
+ Connector conn = mac.getConnector(user1, new KerberosToken());
+ log.info("Created connector as {}", user1);
+
+ // The new user should have no system permissions
+ for (SystemPermission perm : SystemPermission.values()) {
+ assertFalse(conn.securityOperations().hasSystemPermission(user1, perm));
+ }
+
+ UserGroupInformation.loginUserFromKeytab(kdc.getClientPrincipal(), kdc.getClientKeytab().getAbsolutePath());
+ conn = mac.getConnector(kdc.getClientPrincipal(), new KerberosToken());
+
+ conn.securityOperations().grantSystemPermission(user1, SystemPermission.CREATE_TABLE);
+
+ // Switch back to the original user
+ UserGroupInformation.loginUserFromKeytab(user1, user1Keytab.getAbsolutePath());
+ conn = mac.getConnector(user1, new KerberosToken());
+
+ // Shouldn't throw an exception since we granted the create table permission
+ final String table = testName.getMethodName() + "_user_table";
+ conn.tableOperations().create(table);
+
+ // Make sure we can actually use the table we made
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ Mutation m = new Mutation("a");
+ m.put("b", "c", "d");
+ bw.addMutation(m);
+ bw.close();
+
+ conn.tableOperations().compact(table, new CompactionConfig().setWait(true).setFlush(true));
+ }
+
+ @Test
+ public void testUserPrivilegesForTable() throws Exception {
+ String user1 = testName.getMethodName();
+ final File user1Keytab = new File(kdc.getKeytabDir(), user1 + ".keytab");
+ if (user1Keytab.exists()) {
+ user1Keytab.delete();
+ }
+
+ // Create some new users -- cannot contain realm
+ kdc.createPrincipal(user1Keytab, user1);
+
+ user1 = kdc.qualifyUser(user1);
+
+ // Log in as user1
+ UserGroupInformation.loginUserFromKeytab(user1, user1Keytab.getAbsolutePath());
+ log.info("Logged in as {}", user1);
+
+ // Indirectly creates this user when we use it
+ Connector conn = mac.getConnector(user1, new KerberosToken());
+ log.info("Created connector as {}", user1);
+
+ // The new user should have no system permissions
+ for (SystemPermission perm : SystemPermission.values()) {
+ assertFalse(conn.securityOperations().hasSystemPermission(user1, perm));
+ }
+
+ UserGroupInformation.loginUserFromKeytab(kdc.getClientPrincipal(), kdc.getClientKeytab().getAbsolutePath());
+ conn = mac.getConnector(kdc.getClientPrincipal(), new KerberosToken());
+
+ final String table = testName.getMethodName() + "_user_table";
+ conn.tableOperations().create(table);
+
+ final String viz = "viz";
+
+ // Give our unprivileged user permission on the table we made for them
+ conn.securityOperations().grantTablePermission(user1, table, TablePermission.READ);
+ conn.securityOperations().grantTablePermission(user1, table, TablePermission.WRITE);
+ conn.securityOperations().grantTablePermission(user1, table, TablePermission.ALTER_TABLE);
+ conn.securityOperations().grantTablePermission(user1, table, TablePermission.DROP_TABLE);
+ conn.securityOperations().changeUserAuthorizations(user1, new Authorizations(viz));
+
+ // Switch back to the original user
+ UserGroupInformation.loginUserFromKeytab(user1, user1Keytab.getAbsolutePath());
+ conn = mac.getConnector(user1, new KerberosToken());
+
+ // Make sure we can actually use the table we made
+
+ // Write data
+ final long ts = 1000l;
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ Mutation m = new Mutation("a");
+ m.put("b", "c", new ColumnVisibility(viz.getBytes()), ts, "d");
+ bw.addMutation(m);
+ bw.close();
+
+ // Compact
+ conn.tableOperations().compact(table, new CompactionConfig().setWait(true).setFlush(true));
+
+ // Alter
+ conn.tableOperations().setProperty(table, Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+
+ // Read (and proper authorizations)
+ Scanner s = conn.createScanner(table, new Authorizations(viz));
+ Iterator<Entry<Key,Value>> iter = s.iterator();
+ assertTrue("No results from iterator", iter.hasNext());
+ Entry<Key,Value> entry = iter.next();
+ assertEquals(new Key("a", "b", "c", viz, ts), entry.getKey());
+ assertEquals(new Value("d".getBytes()), entry.getValue());
+ assertFalse("Had more results from iterator", iter.hasNext());
+ }
+
+ /**
+ * Creates a table, adds a record to it, and then compacts the table. A simple way to make sure that the system user exists (since the master does an RPC to
+ * the tserver which will create the system user if it doesn't already exist).
+ */
+ private void createTableWithDataAndCompact(Connector conn) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, TableExistsException {
+ final String table = testName.getMethodName() + "_table";
+ conn.tableOperations().create(table);
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ Mutation m = new Mutation("a");
+ m.put("b", "c", "d");
+ bw.addMutation(m);
+ bw.close();
+ conn.tableOperations().compact(table, new CompactionConfig().setFlush(true).setWait(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/test/functional/MetadataIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/MetadataIT.java b/test/src/test/java/org/apache/accumulo/test/functional/MetadataIT.java
index 4c9207a..ae33651 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/MetadataIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/MetadataIT.java
@@ -38,6 +38,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Test;
@@ -45,6 +47,11 @@ import org.junit.Test;
public class MetadataIT extends AccumuloClusterIT {
@Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ }
+
+ @Override
public int defaultTimeoutSeconds() {
return 2 * 60;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/test/security/KerberosTokenTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/security/KerberosTokenTest.java b/test/src/test/java/org/apache/accumulo/test/security/KerberosTokenTest.java
new file mode 100644
index 0000000..5568e9c
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/security/KerberosTokenTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.harness.TestingKdc;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class KerberosTokenTest {
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private static TestingKdc kdc;
+
+ @BeforeClass
+ public static void startKdc() throws Exception {
+ kdc = new TestingKdc();
+ kdc.start();
+ }
+
+ @AfterClass
+ public static void stopKdc() throws Exception {
+ if (null != kdc) {
+ kdc.stop();
+ }
+ }
+
+ @Before
+ public void resetUgiForKrb() {
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ @Test
+ public void test() throws Exception {
+ String user = testName.getMethodName();
+ File userKeytab = new File(kdc.getKeytabDir(), user + ".keytab");
+ if (userKeytab.exists()) {
+ userKeytab.delete();
+ }
+
+ kdc.createPrincipal(userKeytab, user);
+
+ user = kdc.qualifyUser(user);
+
+ UserGroupInformation.loginUserFromKeytab(user, userKeytab.getAbsolutePath());
+ KerberosToken token = new KerberosToken();
+
+ assertEquals(user, token.getPrincipal());
+
+ // Use the long-hand constructor, should be equivalent to short-hand
+ KerberosToken tokenWithPrinc = new KerberosToken(user);
+ assertEquals(token, tokenWithPrinc);
+ assertEquals(token.hashCode(), tokenWithPrinc.hashCode());
+ }
+
+ @Test
+ public void testDestroy() throws Exception {
+ String user = testName.getMethodName();
+ File userKeytab = new File(kdc.getKeytabDir(), user + ".keytab");
+ if (userKeytab.exists()) {
+ userKeytab.delete();
+ }
+
+ kdc.createPrincipal(userKeytab, user);
+
+ user = kdc.qualifyUser(user);
+
+ UserGroupInformation.loginUserFromKeytab(user, userKeytab.getAbsolutePath());
+ KerberosToken token = new KerberosToken();
+
+ assertEquals(user, token.getPrincipal());
+ token.destroy();
+ assertTrue(token.isDestroyed());
+ assertNull(token.getPrincipal());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties
index cb35840..1b89dfe 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -43,3 +43,12 @@ log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=WARN
log4j.logger.org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace=WARN
log4j.logger.BlockStateChange=WARN
log4j.logger.org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator=INFO
+log4j.logger.org.apache.hadoop.security=DEBUG
+log4j.logger.org.apache.hadoop.minikdc=DEBUG
+log4j.logger.org.apache.directory=INFO
+log4j.logger.org.apache.directory.api.ldap=WARN
+# This is really spammy at debug
+log4j.logger.org.apache.thrift.transport.TSaslTransport=INFO
+# From apache-ds/minikdc
+log4j.logger.org.apache.mina=INFO
+log4j.logger.org.apache.accumulo.server.thrift.UGIAssumingProcessor=TRACE
\ No newline at end of file
[3/4] accumulo git commit: ACCUMULO-2815 Support for Kerberos client
authentication.
Posted by el...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/docs/src/main/asciidoc/chapters/kerberos.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/kerberos.txt b/docs/src/main/asciidoc/chapters/kerberos.txt
new file mode 100644
index 0000000..3dcac6d
--- /dev/null
+++ b/docs/src/main/asciidoc/chapters/kerberos.txt
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+== Kerberos
+
+=== Overview
+
+Kerberos is a network authentication protocol that provides a secure way for
+peers to prove their identity over an unsecure network in a client-server model.
+A centralized key-distribution center (KDC) is the service that coordinates
+authentication between a client and a server. Clients and servers use "tickets",
+obtained from the KDC via a password or a special file called a "keytab", to
+communicate with the KDC and prove their identity. A KDC administrator must
+create the principal (name for the client/server identiy) and the password
+or keytab, securely passing the necessary information to the actual user/service.
+Properly securing the KDC and generated ticket material is central to the security
+model and is mentioned only as a warning to administrators running their own KDC.
+
+To interact with Kerberos programmatically, GSSAPI and SASL are two standards
+which allow cross-language integration with Kerberos for authentication. GSSAPI,
+the generic security service application program interface, is a standard which
+Kerberos implements. In the Java programming language, the language itself also implements
+GSSAPI which is leveraged by other applications, like Apache Hadoop and Apache Thrift.
+SASL, simple authentication and security layer, is a framework for authentication and
+and security over the network. SASL provides a number of mechanisms for authentication,
+one of which is GSSAPI. Thus, SASL provides the transport which authenticates
+using GSSAPI that Kerberos implements.
+
+Kerberos is a very complicated software application and is deserving of much
+more description than can be provided here. An http://www.roguelynn.com/words/explain-like-im-5-kerberos/[explain like
+I'm 5] blog post is very good at distilling the basics, while http://web.mit.edu/kerberos/[MIT Kerberos's project page]
+contains lots of documentation for users or administrators. Various Hadoop "vendors"
+also provide free documentation that includes step-by-step instructions for
+configuring Hadoop and ZooKeeper (which will be henceforth considered as prerequisites).
+
+=== Within Hadoop
+
+Out of the box, HDFS and YARN have no ability to enforce that a user is who
+they claim they are. Thus, any basic Hadoop installation should be treated as
+unsecure: any user with access to the cluster has the ability to access any data.
+Using Kerberos to provide authentication, users can be strongly identified, delegating
+to Kerberos to determine who a user is and enforce that a user is who they claim to be.
+As such, Kerberos is widely used across the entire Hadoop ecosystem for strong
+authentication. Since server processes accessing HDFS or YARN are required
+to use Kerberos to authenticate with HDFS, it makes sense that they also require
+Kerberos authentication from their clients, in addition to other features provided
+by SASL.
+
+A typical deployment involves the creation of Kerberos principals for all server
+processes (Hadoop datanodes and namenode(s), ZooKeepers), the creation of a keytab
+file for each principal and then proper configuration for the Hadoop site xml files.
+Users also need Kerberos principals created for them; however, a user typically
+uses a password to identify themselves instead of a keytab. Users can obtain a
+ticket granting ticket (TGT) from the KDC using their password which allows them
+to authenticate for the lifetime of the TGT (typically one day by default) and alleviates
+the need for further password authentication.
+
+For client server applications, like web servers, a keytab can be created which
+allow for fully-automated Kerberos identification removing the need to enter any
+password, at the cost of needing to protect the keytab file. These principals
+will apply directly to authentication for clients accessing Accumulo and the
+Accumulo processes accessing HDFS.
+
+=== Configuring Accumulo
+
+To configure Accumulo for use with Kerberos, both client-facing and server-facing
+changes must be made for a functional system on secured Hadoop. As previously mentioned,
+numerous guidelines already exist on the subject of configuring Hadoop and ZooKeeper for
+use with Kerberos and won't be covered here. It is assumed that you have functional
+Hadoop and ZooKeeper already installed.
+
+==== Servers
+
+The first step is to obtain a Kerberos identity for the Accumulo server processes.
+When running Accumulo with Kerberos enabled, a valid Kerberos identity will be required
+to initiate any RPC between Accumulo processes (e.g. Master and TabletServer) in addition
+to any HDFS action (e.g. client to HDFS or TabletServer to HDFS).
+
+===== Generate Principal and Keytab
+
+In the +kadmin.local+ shell or using the +-q+ option on +kadmin.local+, create a
+principal for Accumulo for all hosts that are running Accumulo processes. A Kerberos
+principal is of the form "primary/instance@REALM". "accumulo" is commonly the "primary"
+(although not required) and the "instance" is the fully-qualified domain name for
+the host that will be running the Accumulo process -- this is required.
+
+----
+kadmin.local -q "addprinc -randkey accumulo/host.domain.com"
+----
+
+Perform the above for each node running Accumulo processes in the instance, modifying
+"host.domain.com" for your network. The +randkey+ option generates a random password
+because we will use a keytab for authentication, not a password, since the Accumulo
+server processes don't have an interactive console to enter a password into.
+
+----
+kadmin.local -q "xst -k accumulo.hostname.keytab accumulo/host.domain.com"
+----
+
+To simplify deployments, at thet cost of security, all Accumulo principals could
+be globbed into a single keytab
+
+----
+kadmin.local -q "xst -k accumulo.service.keytab -glob accumulo*"
+----
+
+To ensure that the SASL handshake can occur from clients to servers and servers to servers,
+all Accumulo servers must share the same instance and realm principal components as the
+"client" must know these to setup the connection with the "server".
+
+===== Server Configuration
+
+A number of properties need to be changed to account to properly configure servers
+in +accumulo-site.xml+.
+
+* *general.kerberos.keytab*=_/etc/security/keytabs/accumulo.service.keytab_
+** The path to the keytab for Accumulo on local filesystem.
+** Change the value to the actual path on your system.
+* *general.kerberos.principal*=_accumulo/_HOST@REALM_
+** The Kerberos principal for Accumulo, needs to match the keytab.
+** "_HOST" can be used instead of the actual hostname in the principal and will be
+automatically expanded to the current FQDN which reduces the configuration file burden.
+* *instance.rpc.sasl.enabled*=_true_
+** Enables SASL for the Thrift Servers (supports GSSAPI)
+* *instance.security.authenticator*=_org.apache.accumulo.server.security.handler.KerberosAuthenticator_
+** Configures Accumulo to use the Kerberos principal as the Accumulo username/principal
+* *instance.security.authorizor*=_org.apache.accumulo.server.security.handler.KerberosAuthorizor_
+** Configures Accumulo to use the Kerberos principal for authorization purposes
+* *instance.security.permissionHandler*=_org.apache.accumulo.server.security.handler.KerberosPermissionHandler_
+** Configures Accumulo to use the Kerberos principal for permission purposes
+* *trace.token.type*=_org.apache.accumulo.core.client.security.tokens.KerberosToken_
+** Configures the Accumulo Tracer to use the KerberosToken for authentication when
+serializing traces to the trace table.
+* *trace.user*=_accumulo/_HOST@REALM_
+** The tracer process needs valid credentials to serialize traces to Accumulo.
+** While the other server processes are creating a SystemToken from the provided keytab and principal, we can
+still use a normal KerberosToken and the same keytab/principal to serialize traces. Like
+non-Kerberized instances, the table must be created and permissions granted to the trace.user.
+** The same +_HOST+ replacement is performed on this value, substituted the FQDN for +_HOST+.
+
+Although it should be a prerequisite, it is ever important that you have DNS properly
+configured for your nodes and that Accumulo is configured to use the FQDN. It
+is extremely important to use the FQDN in each of the "hosts" files for each
+Accumulo process: +masters+, +monitors+, +slaves+, +tracers+, and +gc+.
+
+===== KerberosAuthenticator
+
+The +KerberosAuthenticator+ is an implementation of the pluggable security interfaces
+that Accumulo provides. It builds on top of what the default ZooKeeper-based implementation,
+but removes the need to create user accounts with passwords in Accumulo for clients. As
+long as a client has a valid Kerberos identity, they can connect to and interact with
+Accumulo, but without any permissions (e.g. cannot create tables or write data). Leveraging
+ZooKeeper removes the need to change the permission handler and authorizor, so other Accumulo
+functions regarding permissions and cell-level authorizations do not change.
+
+It is extremely important to note that, while user operations like +SecurityOperations.listLocalUsers()+,
++SecurityOperations.dropLocalUser()+, and +SecurityOperations.createLocalUser()+ will not return
+errors, these methods are not equivalent to normal installations, as they will only operate on
+users which have, at one point in time, authenticated with Accumulo using their Kerberos identity.
+The KDC is still the authoritative entity for user management. The previously mentioned methods
+are provided as they simplify management of users within Accumulo, especially with respect
+to granting Authorizations and Permissions to new users.
+
+===== Verifying secure access
+
+To verify that servers have correctly started with Kerberos enabled, ensure that the processes
+are actually running (they should exit immediately if login fails) and verify that you see
+something similar to the following in the application log.
+
+----
+2015-01-07 11:57:56,826 [security.SecurityUtil] INFO : Attempting to login with keytab as accumulo/hostname@EXAMPLE.COM
+2015-01-07 11:57:56,830 [security.UserGroupInformation] INFO : Login successful for user accumulo/hostname@EXAMPLE.COM using keytab file /etc/security/keytabs/accumulo.service.keytab
+----
+
+==== Clients
+
+===== Create client principal
+
+Like the Accumulo servers, clients must also have a Kerberos principal created for them. The
+primary difference between a server principal is that principals for users are created
+with a password and also not qualified to a specific instance (host).
+
+----
+kadmin.local -q "addprinc $user"
+----
+
+The above will prompt for a password for that user which will be used to identify that $user.
+The user can verify that they can authenticate with the KDC using the command `kinit $user`.
+Upon entering the correct password, a local credentials cache will be made which can be used
+to authenticate with Accumulo, access HDFS, etc.
+
+The user can verify the state of their local credentials cache by using the command `klist`.
+
+----
+$ klist
+Ticket cache: FILE:/tmp/krb5cc_123
+Default principal: user@EXAMPLE.COM
+
+Valid starting Expires Service principal
+01/07/2015 11:56:35 01/08/2015 11:56:35 krbtgt/EXAMPLE.COM@EXAMPLE.COM
+ renew until 01/14/2015 11:56:35
+----
+
+===== Configuration
+
+The second thing clients need to do is to set up their client configuration file. By
+default, this file is stored in +~/.accumulo/conf+, +$ACCUMULO_CONF_DIR/client.conf+ or
++$ACCUMULO_HOME/conf/client.conf+. Accumulo utilities also allow you to provide your own
+copy of this file in any location using the +--config-file+ command line option.
+
+Three items need to be set to enable access to Accumulo:
+
+* +instance.rpc.sasl.enabled+=_true_
+* +kerberos.server.primary+=_accumulo_
+* +kerberos.server.realm+=_EXAMPLE.COM_
+
+The second and third properties *must* match the configuration of the accumulo servers; this is
+required to set up the SASL transport.
+
+==== Debugging
+
+*Q*: I have valid Kerberos credentials and a correct client configuration file but
+I still get errors like:
+
+----
+java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
+----
+
+*A*: When you have a valid client configuration and Kerberos TGT, it is possible that the search
+path for your local credentials cache is incorrect. Check the value of the KRB5CCNAME environment
+value, and ensure it matches the value reported by `klist`.
+
+----
+$ echo $KRB5CCNAME
+
+$ klist
+Ticket cache: FILE:/tmp/krb5cc_123
+Default principal: user@EXAMPLE.COM
+
+Valid starting Expires Service principal
+01/07/2015 11:56:35 01/08/2015 11:56:35 krbtgt/EXAMPLE.COM@EXAMPLE.COM
+ renew until 01/14/2015 11:56:35
+$ export KRB5CCNAME=/tmp/krb5cc_123
+$ echo $KRB5CCNAME
+/tmp/krb5cc_123
+----
+
+*Q*: I thought I had everything configured correctly, but my client/server still fails to log in.
+I don't know what is actually failing.
+
+*A*: Add the following system property to the JVM invocation:
+
+----
+-Dsun.security.krb5.debug=true
+----
+
+This will enable lots of extra debugging at the JVM level which is often sufficient to
+diagnose some high-level configuration problem. Client applications can add this system property by
+hand to the command line and Accumulo server processes or applications started using the `accumulo`
+script by adding the property to +ACCUMULO_GENERAL_OPTS+ in +$ACCUMULO_CONF_DIR/accumulo-env.sh+.
+
+Additionally, you can increase the log4j levels on +org.apache.hadoop.security+, which includes the
+Hadoop +UserGroupInformation+ class, which will include some high-level debug statements. This
+can be controlled in your client application, or using +$ACCUMULO_CONF_DIR/generic_logger.xml+
+
+*Q*: All of my Accumulo processes successfully start and log in with their
+keytab, but they are unable to communicate with each other, showing the
+following errors:
+
+----
+2015-01-12 14:47:27,055 [transport.TSaslTransport] ERROR: SASL negotiation failure
+javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]
+ at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
+ at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
+ at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
+ at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
+ at org.apache.accumulo.core.rpc.UGIAssumingTransport$1.run(UGIAssumingTransport.java:53)
+ at org.apache.accumulo.core.rpc.UGIAssumingTransport$1.run(UGIAssumingTransport.java:49)
+ at java.security.AccessController.doPrivileged(Native Method)
+ at javax.security.auth.Subject.doAs(Subject.java:415)
+ at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
+ at org.apache.accumulo.core.rpc.UGIAssumingTransport.open(UGIAssumingTransport.java:49)
+ at org.apache.accumulo.core.rpc.ThriftUtil.createClientTransport(ThriftUtil.java:357)
+ at org.apache.accumulo.core.rpc.ThriftUtil.createTransport(ThriftUtil.java:255)
+ at org.apache.accumulo.server.master.LiveTServerSet$TServerConnection.getTableMap(LiveTServerSet.java:106)
+ at org.apache.accumulo.master.Master.gatherTableInformation(Master.java:996)
+ at org.apache.accumulo.master.Master.access$600(Master.java:160)
+ at org.apache.accumulo.master.Master$StatusThread.updateStatus(Master.java:911)
+ at org.apache.accumulo.master.Master$StatusThread.run(Master.java:901)
+Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
+ at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:710)
+ at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
+ at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
+ at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
+ ... 16 more
+Caused by: KrbException: Server not found in Kerberos database (7) - LOOKING_UP_SERVER
+ at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73)
+ at sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:192)
+ at sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:203)
+ at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:309)
+ at sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:115)
+ at sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:454)
+ at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:641)
+ ... 19 more
+Caused by: KrbException: Identifier doesn't match expected value (906)
+ at sun.security.krb5.internal.KDCRep.init(KDCRep.java:143)
+ at sun.security.krb5.internal.TGSRep.init(TGSRep.java:66)
+ at sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:61)
+ at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55)
+ ... 25 more
+----
+
+or
+
+----
+2015-01-12 14:47:29,440 [server.TThreadPoolServer] ERROR: Error occurred during processing of message.
+java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: Peer indicated failure: GSS initiate failed
+ at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
+ at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory$1.run(UGIAssumingTransportFactory.java:51)
+ at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory$1.run(UGIAssumingTransportFactory.java:48)
+ at java.security.AccessController.doPrivileged(Native Method)
+ at javax.security.auth.Subject.doAs(Subject.java:356)
+ at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1608)
+ at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory.getTransport(UGIAssumingTransportFactory.java:48)
+ at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:208)
+ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
+ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
+ at java.lang.Thread.run(Thread.java:745)
+Caused by: org.apache.thrift.transport.TTransportException: Peer indicated failure: GSS initiate failed
+ at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:190)
+ at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
+ at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
+ at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
+ at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
+ ... 10 more
+----
+
+*A*: As previously mentioned, the hostname, and subsequently the address each Accumulo process is bound/listening
+on, is extremely important when negotiating an SASL connection. This problem commonly arises when the Accumulo
+servers are not configured to listen on the address denoted by their FQDN.
+
+The values in the Accumulo "hosts" files (In +$ACCUMULO_CONF_DIR+: +masters+, +monitors+, +slaves+, +tracers+,
+and +gc+) should match the instance componentof the Kerberos server principal (e.g. +host+ in +accumulo/host\@EXAMPLE.COM+).
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index d2a999d..76f332b 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@ -38,6 +38,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -557,7 +558,21 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
s.close();
}
}
- Process initProcess = exec(Initialize.class, "--instance-name", config.getInstanceName(), "--password", config.getRootPassword());
+
+ LinkedList<String> args = new LinkedList<>();
+ args.add("--instance-name");
+ args.add(config.getInstanceName());
+ args.add("--user");
+ args.add(config.getRootUserName());
+
+ // If we aren't using SASL, add in the root password
+ final String saslEnabled = config.getSiteConfig().get(Property.INSTANCE_RPC_SASL_ENABLED.getKey());
+ if (null == saslEnabled || !Boolean.parseBoolean(saslEnabled)) {
+ args.add("--password");
+ args.add(config.getRootPassword());
+ }
+
+ Process initProcess = exec(Initialize.class, args.toArray(new String[0]));
int ret = initProcess.waitFor();
if (ret != 0) {
throw new RuntimeException("Initialize process returned " + ret + ". Check the logs in " + config.getLogDir() + " for errors.");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
index 26c23ed..6d674f3 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
@@ -52,6 +52,7 @@ public class MiniAccumuloConfigImpl {
private Map<String,String> systemProperties = new HashMap<String,String>();
private String instanceName = "miniInstance";
+ private String rootUserName = "root";
private File libDir;
private File libExtDir;
@@ -667,4 +668,23 @@ public class MiniAccumuloConfigImpl {
public Configuration getHadoopConfiguration() {
return hadoopConf;
}
+
+ /**
+ * @return the default Accumulo "superuser"
+ * @since 1.7.0
+ */
+ public String getRootUserName() {
+ return rootUserName;
+ }
+
+ /**
+ * Sets the default Accumulo "superuser".
+ *
+ * @param rootUserName
+ * The name of the user to create with administrative permissions during initialization
+ * @since 1.7.0
+ */
+ public void setRootUserName(String rootUserName) {
+ this.rootUserName = rootUserName;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6bffbe1..2c21ff6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
<!-- surefire/failsafe plugin option -->
<forkCount>1</forkCount>
<!-- overwritten in profiles hadoop-2 -->
- <hadoop.version>2.2.0</hadoop.version>
+ <hadoop.version>2.3.0</hadoop.version>
<htrace.version>3.0.4</htrace.version>
<httpclient.version>3.1</httpclient.version>
<java.ver>1.7</java.ver>
@@ -359,6 +359,11 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-tools</artifactId>
<version>${hadoop.version}</version>
</dependency>
@@ -877,6 +882,12 @@
</rules>
</configuration>
</plugin>
+ <plugin>
+ <!-- Allows us to get the apache-ds bundle artifacts -->
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>2.4.0</version>
+ </plugin>
</plugins>
</pluginManagement>
<plugins>
@@ -1070,6 +1081,13 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <!-- Allows us to get the apache-ds bundle artifacts -->
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <inherited>true</inherited>
+ </plugin>
</plugins>
<extensions>
<extension>
@@ -1303,7 +1321,7 @@
<!-- Denotes intention and allows the enforcer plugin to pass when
the user is relying on default behavior; won't work to activate profile -->
<hadoop.profile>2</hadoop.profile>
- <hadoop.version>2.2.0</hadoop.version>
+ <hadoop.version>2.3.0</hadoop.version>
<httpclient.version>3.1</httpclient.version>
<slf4j.version>1.7.5</slf4j.version>
</properties>
@@ -1320,7 +1338,7 @@
</property>
</activation>
<properties>
- <hadoop.version>2.2.0</hadoop.version>
+ <hadoop.version>2.3.0</hadoop.version>
<httpclient.version>3.1</httpclient.version>
<slf4j.version>1.7.5</slf4j.version>
</properties>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 7eb4fbf..81509ee 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.proxy.thrift.AccumuloProxy;
import org.apache.accumulo.server.rpc.RpcWrapper;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.log4j.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -135,7 +136,7 @@ public class Proxy {
@SuppressWarnings("unchecked")
Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<? extends TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass);
- final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl));
+ final TProcessor processor = proxyProcConstructor.newInstance(TCredentialsUpdatingWrapper.service(RpcWrapper.service(impl), impl.getClass()));
THsHaServer.Args args = new THsHaServer.Args(socket);
args.processor(processor);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
index 09ae4f4..84c3853 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
@@ -16,14 +16,24 @@
*/
package org.apache.accumulo.server;
+import java.io.IOException;
+
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Preconditions;
/**
* Provides a server context for Accumulo server components that operate with the system credentials and have access to the system files and configuration.
@@ -38,6 +48,31 @@ public class AccumuloServerContext extends ClientContext {
public AccumuloServerContext(ServerConfigurationFactory confFactory) {
super(confFactory.getInstance(), getCredentials(confFactory.getInstance()), confFactory.getConfiguration());
this.confFactory = confFactory;
+ if (null != getServerSaslParams()) {
+ // Server-side "client" check to make sure we're logged in as a user we expect to be
+ enforceKerberosLogin();
+ }
+ }
+
+ /**
+ * A "client-side" assertion for servers to validate that they are logged in as the expected user, per the configuration, before performing any RPC
+ */
+ // Should be private, but package-protected so EasyMock will work
+ void enforceKerberosLogin() {
+ final AccumuloConfiguration conf = confFactory.getSiteConfiguration();
+ // Unwrap _HOST into the FQDN to make the kerberos principal we'll compare against
+ final String kerberosPrincipal = SecurityUtil.getServerPrincipal(conf.get(Property.GENERAL_KERBEROS_PRINCIPAL));
+ UserGroupInformation loginUser;
+ try {
+ // The system user should be logged in via keytab when the process is started, not the currentUser() like KerberosToken
+ loginUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not get login user", e);
+ }
+
+ Preconditions.checkArgument(loginUser.hasKerberosCredentials(), "Server does not have Kerberos credentials");
+ Preconditions.checkArgument(kerberosPrincipal.equals(loginUser.getUserName()),
+ "Expected login user to be " + kerberosPrincipal + " but was " + loginUser.getUserName());
}
/**
@@ -64,4 +99,35 @@ public class AccumuloServerContext extends ClientContext {
return SslConnectionParams.forServer(getConfiguration());
}
+ public SaslConnectionParams getServerSaslParams() {
+ // Not functionally different than the client SASL params, just uses the site configuration
+ return SaslConnectionParams.forConfig(getServerConfigurationFactory().getSiteConfiguration());
+ }
+
+ /**
+ * Determine the type of Thrift server to instantiate given the server's configuration.
+ *
+ * @return A {@link ThriftServerType} value to denote the type of Thrift server to construct
+ */
+ public ThriftServerType getThriftServerType() {
+ AccumuloConfiguration conf = getConfiguration();
+ if (conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED)) {
+ if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ throw new IllegalStateException("Cannot create a Thrift server capable of both SASL and SSL");
+ }
+
+ return ThriftServerType.SSL;
+ } else if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ if (conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED)) {
+ throw new IllegalStateException("Cannot create a Thrift server capable of both SASL and SSL");
+ }
+
+ return ThriftServerType.SASL;
+ } else {
+ // Lets us control the type of Thrift server created, primarily for benchmarking purposes
+ String serverTypeName = conf.get(Property.GENERAL_RPC_SERVER_TYPE);
+ return ThriftServerType.get(serverTypeName);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 2da6ba0..4a9f1e7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -95,11 +95,13 @@ import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.util.ReplicationTableUtil;
import org.apache.accumulo.server.util.TablePropUtil;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -280,11 +282,27 @@ public class Initialize {
log.fatal("Failed to talk to zookeeper", e);
return false;
}
- opts.rootpass = getRootPassword(opts);
- return initialize(opts, instanceNamePath, fs);
+
+ String rootUser;
+ try {
+ rootUser = getRootUserName(opts);
+ } catch (Exception e) {
+ log.fatal("Failed to obtain user for administrative privileges");
+ return false;
+ }
+
+ // Don't prompt for a password when we're running SASL(Kerberos)
+ final AccumuloConfiguration siteConf = SiteConfiguration.getInstance();
+ if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ opts.rootpass = UUID.randomUUID().toString().getBytes(UTF_8);
+ } else {
+ opts.rootpass = getRootPassword(opts, rootUser);
+ }
+
+ return initialize(opts, instanceNamePath, fs, rootUser);
}
- private static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) {
+ private static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs, String rootUser) {
UUID uuid = UUID.randomUUID();
// the actual disk locations of the root table and tablets
@@ -320,9 +338,38 @@ public class Initialize {
return false;
}
+ final ServerConfigurationFactory confFactory = new ServerConfigurationFactory(HdfsZooInstance.getInstance());
+
+ // When we're using Kerberos authentication, we need valid credentials to perform initialization. If the user provided some, use them.
+ // If they did not, fall back to the credentials present in accumulo-site.xml that the servers will use themselves.
try {
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
- initSecurity(context, opts, uuid.toString());
+ final SiteConfiguration siteConf = confFactory.getSiteConfiguration();
+ if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ // We don't have any valid creds to talk to HDFS
+ if (!ugi.hasKerberosCredentials()) {
+ final String accumuloKeytab = siteConf.get(Property.GENERAL_KERBEROS_KEYTAB), accumuloPrincipal = siteConf.get(Property.GENERAL_KERBEROS_PRINCIPAL);
+
+ // Fail if the site configuration doesn't contain appropriate credentials to login as servers
+ if (StringUtils.isBlank(accumuloKeytab) || StringUtils.isBlank(accumuloPrincipal)) {
+ log.fatal("No Kerberos credentials provided, and Accumulo is not properly configured for server login");
+ return false;
+ }
+
+ log.info("Logging in as " + accumuloPrincipal + " with " + accumuloKeytab);
+
+ // Login using the keytab as the 'accumulo' user
+ UserGroupInformation.loginUserFromKeytab(accumuloPrincipal, accumuloKeytab);
+ }
+ }
+ } catch (IOException e) {
+ log.fatal("Failed to get the Kerberos user", e);
+ return false;
+ }
+
+ try {
+ AccumuloServerContext context = new AccumuloServerContext(confFactory);
+ initSecurity(context, opts, uuid.toString(), rootUser);
} catch (Exception e) {
log.fatal("Failed to initialize security", e);
return false;
@@ -525,18 +572,43 @@ public class Initialize {
return instanceNamePath;
}
- private static byte[] getRootPassword(Opts opts) throws IOException {
+ private static String getRootUserName(Opts opts) throws IOException {
+ AccumuloConfiguration conf = SiteConfiguration.getInstance();
+ final String keytab = conf.get(Property.GENERAL_KERBEROS_KEYTAB);
+ if (keytab.equals(Property.GENERAL_KERBEROS_KEYTAB.getDefaultValue()) || !conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ return DEFAULT_ROOT_USER;
+ }
+
+ ConsoleReader c = getConsoleReader();
+ c.println("Running against secured HDFS");
+
+ if (null != opts.rootUser) {
+ return opts.rootUser;
+ }
+
+ do {
+ String user = c.readLine("Principal (user) to grant administrative privileges to : ");
+ if (user == null) {
+ // should not happen
+ System.exit(1);
+ }
+ if (!user.isEmpty()) {
+ return user;
+ }
+ } while (true);
+ }
+
+ private static byte[] getRootPassword(Opts opts, String rootUser) throws IOException {
if (opts.cliPassword != null) {
return opts.cliPassword.getBytes(UTF_8);
}
String rootpass;
String confirmpass;
do {
- rootpass = getConsoleReader()
- .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*');
+ rootpass = getConsoleReader().readLine("Enter initial password for " + rootUser + " (this may not be applicable for your security setup): ", '*');
if (rootpass == null)
System.exit(0);
- confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*');
+ confirmpass = getConsoleReader().readLine("Confirm initial password for " + rootUser + ": ", '*');
if (confirmpass == null)
System.exit(0);
if (!rootpass.equals(confirmpass))
@@ -545,8 +617,9 @@ public class Initialize {
return rootpass.getBytes(UTF_8);
}
- private static void initSecurity(AccumuloServerContext context, Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException {
- AuditedSecurityOperation.getInstance(context, true).initializeSecurity(context.rpcCreds(), DEFAULT_ROOT_USER, opts.rootpass);
+ private static void initSecurity(AccumuloServerContext context, Opts opts, String iid, String rootUser) throws AccumuloSecurityException,
+ ThriftSecurityException, IOException {
+ AuditedSecurityOperation.getInstance(context, true).initializeSecurity(context.rpcCreds(), rootUser, opts.rootpass);
}
public static void initSystemTablesConfig() throws IOException {
@@ -635,6 +708,8 @@ public class Initialize {
String cliInstanceName;
@Parameter(names = "--password", description = "set the password on the command line")
String cliPassword;
+ @Parameter(names = {"-u", "--user"}, description = "the name of the user to grant system permissions to")
+ String rootUser = null;
byte[] rootpass = null;
}
@@ -653,8 +728,9 @@ public class Initialize {
if (opts.resetSecurity) {
AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
if (isInitialized(fs)) {
- opts.rootpass = getRootPassword(opts);
- initSecurity(context, opts, HdfsZooInstance.getInstance().getInstanceID());
+ final String rootUser = getRootUserName(opts);
+ opts.rootpass = getRootPassword(opts, rootUser);
+ initSecurity(context, opts, HdfsZooInstance.getInstance().getInstanceID(), rootUser);
} else {
log.fatal("Attempted to reset security on accumulo before it was initialized");
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
new file mode 100644
index 0000000..f8400e2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
+import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extracts the TCredentials object from the RPC argument list and asserts that the Accumulo principal is equal to the Kerberos 'primary' component of the
+ * Kerberos principal (e.g. Accumulo principal of "frank" equals "frank" from "frank/hostname@DOMAIN").
+ */
+public class TCredentialsUpdatingInvocationHandler<I> implements InvocationHandler {
+ private static final Logger log = LoggerFactory.getLogger(TCredentialsUpdatingInvocationHandler.class);
+
+ private static final ConcurrentHashMap<String,Class<? extends AuthenticationToken>> TOKEN_CLASS_CACHE = new ConcurrentHashMap<>();
+ private final I instance;
+
+ protected TCredentialsUpdatingInvocationHandler(final I serverInstance) {
+ instance = serverInstance;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ updateArgs(args);
+
+ return invokeMethod(method, args);
+ }
+
+ /**
+ * Try to find a TCredentials object in the argument list, and, when the AuthenticationToken is a KerberosToken, set the principal from the SASL server as the
+ * TCredentials principal. This ensures that users can't spoof a different principal into the Credentials than what they used to authenticate.
+ */
+ protected void updateArgs(Object[] args) throws ThriftSecurityException {
+ // If we don't have at least two args
+ if (args == null || args.length < 2) {
+ return;
+ }
+
+ TCredentials tcreds = null;
+ if (args[0] != null && args[0] instanceof TCredentials) {
+ tcreds = (TCredentials) args[0];
+ } else if (args[1] != null && args[1] instanceof TCredentials) {
+ tcreds = (TCredentials) args[1];
+ }
+
+ // If we don't find a tcredentials in the first two positions
+ if (null == tcreds) {
+ // Not all calls require authentication (e.g. closeMultiScan). We need to let these pass through.
+ log.trace("Did not find a TCredentials object in the first two positions of the argument list, not updating principal");
+ return;
+ }
+
+ Class<? extends AuthenticationToken> tokenClass = getTokenClassFromName(tcreds.tokenClassName);
+ // If the authentication token isn't a KerberosToken
+ if (!KerberosToken.class.isAssignableFrom(tokenClass) && !SystemToken.class.isAssignableFrom(tokenClass)) {
+ // Don't include messages about SystemToken since it's internal
+ log.debug("Will not update principal on authentication tokens other than KerberosToken. Received " + tokenClass);
+ throw new ThriftSecurityException("Did not receive a valid token", SecurityErrorCode.BAD_CREDENTIALS);
+ }
+
+ // The Accumulo principal extracted from the SASL transport
+ final String principal = UGIAssumingProcessor.currentPrincipal();
+
+ if (null == principal) {
+ log.debug("Found KerberosToken in TCredentials, but did not receive principal from SASL processor");
+ throw new ThriftSecurityException("Did not extract principal from Thrift SASL processor", SecurityErrorCode.BAD_CREDENTIALS);
+ }
+
+ // The principal from the SASL transport should match what the user requested as their Accumulo principal
+ if (!principal.equals(tcreds.principal)) {
+ final String msg = "Principal in credentials object should match kerberos principal. Expected '" + principal + "' but was '" + tcreds.principal + "'";
+ log.warn(msg);
+ throw new ThriftSecurityException(msg, SecurityErrorCode.BAD_CREDENTIALS);
+ }
+ }
+
+ protected Class<? extends AuthenticationToken> getTokenClassFromName(String tokenClassName) {
+ Class<? extends AuthenticationToken> typedClz = TOKEN_CLASS_CACHE.get(tokenClassName);
+ if (null == typedClz) {
+ Class<?> clz;
+ try {
+ clz = Class.forName(tokenClassName);
+ } catch (ClassNotFoundException e) {
+ log.debug("Could not create class from token name: " + tokenClassName, e);
+ return null;
+ }
+ typedClz = clz.asSubclass(AuthenticationToken.class);
+ }
+ TOKEN_CLASS_CACHE.putIfAbsent(tokenClassName, typedClz);
+ return typedClz;
+ }
+
+ private Object invokeMethod(Method method, Object[] args) throws Throwable {
+ try {
+ return method.invoke(instance, args);
+ } catch (InvocationTargetException ex) {
+ throw ex.getCause();
+ }
+ }
+
+ /**
+ * Visibile for testing
+ */
+ protected ConcurrentHashMap<String,Class<? extends AuthenticationToken>> getTokenCache() {
+ return TOKEN_CLASS_CACHE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
new file mode 100644
index 0000000..4621d36
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+
+/**
+ * Utility method to ensure that the instance of TCredentials which is passed to the implementation of a Thrift service has the correct principal from SASL at
+ * the Thrift transport layer when SASL/GSSAPI (kerberos) is enabled. This ensures that we use the strong authentication provided to us and disallow any other
+ * principal names that client (malicious or otherwise) might pass in.
+ */
+public class TCredentialsUpdatingWrapper {
+
+ public static <T> T service(final T instance, final Class<? extends T> originalClass) {
+ InvocationHandler handler = new TCredentialsUpdatingInvocationHandler<T>(instance);
+
+ @SuppressWarnings("unchecked")
+ T proxiedInstance = (T) Proxy.newProxyInstance(originalClass.getClassLoader(), originalClass.getInterfaces(), handler);
+
+ return proxiedInstance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 641c0bf..985df9c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.server.rpc;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.net.BindException;
import java.net.InetAddress;
@@ -33,26 +34,34 @@ import javax.net.ssl.SSLServerSocket;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
/**
@@ -115,6 +124,11 @@ public class TServerUtils {
portSearch = config.getBoolean(portSearchProperty);
final int simpleTimerThreadpoolSize = config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+ final ThriftServerType serverType = service.getThriftServerType();
+
+ if (ThriftServerType.SASL == serverType) {
+ processor = updateSaslProcessor(serverType, processor);
+ }
// create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
TimedProcessor timedProcessor = new TimedProcessor(config, processor, serverName, threadName);
@@ -135,8 +149,9 @@ public class TServerUtils {
port = 1024 + port % (65535 - 1024);
try {
HostAndPort addr = HostAndPort.fromParts(hostname, port);
- return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks,
- maxMessageSize, service.getServerSslParams(), service.getClientTimeoutInMillis());
+ return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads,
+ simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize,
+ service.getServerSslParams(), service.getServerSaslParams(), service.getClientTimeoutInMillis());
} catch (TTransportException ex) {
log.error("Unable to start TServer", ex);
if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
@@ -209,7 +224,31 @@ public class TServerUtils {
}
/**
- * Create a TThreadPoolServer with the given transport and processor
+ * Creates a TTheadPoolServer for normal unsecure operation. Useful for comparing performance against SSL or SASL transports.
+ *
+ * @param address
+ * Address to bind to
+ * @param processor
+ * TProcessor for the server
+ * @param maxMessageSize
+ * Maximum size of a Thrift message allowed
+ * @return A configured TThreadPoolServer and its bound address information
+ */
+ public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize) throws TTransportException {
+
+ TServerSocket transport = new TServerSocket(address.getPort());
+ TThreadPoolServer server = createThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize));
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+
+ return new ServerAddress(server, address);
+
+ }
+
+ /**
+ * Create a TThreadPoolServer with the given transport and processo with the default transport factory.r
*
* @param transport
* TServerTransport for the server
@@ -218,9 +257,23 @@ public class TServerUtils {
* @return A configured TThreadPoolServer
*/
public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
- final TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ return createThreadPoolServer(transport, processor, ThriftUtil.transportFactory());
+ }
+
+ /**
+ * Create a TServer with the provided server transport, processor and transport factory.
+ *
+ * @param transport
+ * TServerTransport for the server
+ * @param processor
+ * TProcessor for the server
+ * @param transportFactory
+ * TTransportFactory for the server
+ */
+ public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) {
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
options.protocolFactory(ThriftUtil.protocolFactory());
- options.transportFactory(ThriftUtil.transportFactory());
+ options.transportFactory(transportFactory);
options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
return new TThreadPoolServer(options);
}
@@ -284,7 +337,7 @@ public class TServerUtils {
*/
public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
throws TTransportException {
- org.apache.thrift.transport.TServerSocket transport;
+ TServerSocket transport;
try {
transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
} catch (UnknownHostException e) {
@@ -296,14 +349,63 @@ public class TServerUtils {
return new ServerAddress(createThreadPoolServer(transport, processor), address);
}
- /**
- * Create a Thrift server given the provided and Accumulo configuration.
- */
- public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName,
- int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout)
+ public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SaslConnectionParams params,
+ final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize)
throws TTransportException {
- return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
- timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
+ // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does,
+ // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail
+ // when the server does an accept() to (presumably) wake up the eventing system.
+ log.info("Creating SASL thread pool thrift server on port=" + address.getPort());
+ TServerSocket transport = new TServerSocket(address.getPort());
+
+ final String hostname;
+ try {
+ hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+
+ final UserGroupInformation serverUser;
+ try {
+ serverUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
+
+ log.trace("Logged in as {}, creating TSsaslServerTransport factory as {}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
+
+ // Make the SASL transport factory with the instance and primary from the kerberos server principal, SASL properties
+ // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication ID. Despite the 'protocol' argument seeming to be useless, it
+ // *must* be the primary of the server.
+ TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
+ saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+ new SaslRpcServer.SaslGssCallbackHandler());
+
+ // Updates the clientAddress threadlocal so we know who the client's address
+ final ClientInfoProcessorFactory clientInfoFactory = new ClientInfoProcessorFactory(clientAddress, processor);
+
+ // Make sure the TTransportFactory is performing a UGI.doAs
+ TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+
+ return new ServerAddress(new TThreadPoolServer(new TThreadPoolServer.Args(transport).transportFactory(ugiTransportFactory)
+ .processorFactory(clientInfoFactory)
+ .protocolFactory(ThriftUtil.protocolFactory())), address);
+ }
+
+ public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
+ String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+ if (ThriftServerType.SASL == serverType) {
+ processor = updateSaslProcessor(serverType, processor);
+ }
+
+ return startTServer(address, serverType, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
}
/**
@@ -311,14 +413,33 @@ public class TServerUtils {
*
* @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
*/
- public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
- int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+ public static ServerAddress startTServer(HostAndPort address,ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName, int numThreads,
+ int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+ // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports
+ // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues.
+ Preconditions.checkArgument(!(sslParams != null && saslParams != null), "Cannot start a Thrift server using both SSL and SASL");
ServerAddress serverAddress;
- if (sslParams != null) {
- serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
- } else {
- serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+ switch (serverType) {
+ case SSL:
+ log.debug("Instantiating SSL Thrift server");
+ serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams);
+ break;
+ case SASL:
+ log.debug("Instantiating SASL Thrift server");
+ serverAddress = createSaslThreadPoolServer(address, processor, serverSocketTimeout, saslParams, serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize);
+ break;
+ case THREADPOOL:
+ log.debug("Instantiating unsecure TThreadPool Thrift server");
+ serverAddress = createBlockingServer(address, processor, maxMessageSize);
+ break;
+ case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default
+ default:
+ log.debug("Instantiating default, unsecure custom half-async Thrift server");
+ serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
}
final TServer finalServer = serverAddress.server;
@@ -368,4 +489,21 @@ public class TServerUtils {
log.error("Unable to call shutdownNow", e);
}
}
+
+ /**
+ * Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication works. Requires the <code>serverType</code> to be
+ * {@link ThriftServerType#SASL} and throws an exception when it is not.
+ *
+ * @return A {@link UGIAssumingProcessor} which wraps the provided processor
+ */
+ private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) {
+ Preconditions.checkArgument(ThriftServerType.SASL == serverType);
+
+ // Wrap the provided processor in our special processor which proxies the provided UGI on the logged-in UGI
+ // Important that we have Timed -> UGIAssuming -> [provided] to make sure that the metrics are still reported
+ // as the logged-in user.
+ log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
+
+ return new UGIAssumingProcessor(processor);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
new file mode 100644
index 0000000..60d5402
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * The type of configured Thrift server to start. This is meant more as a developer knob to ensure that appropriate Thrift servers can be constructed to make a
+ * better test on the overhead of SSL or SASL.
+ *
+ * Both SSL and SASL don't presently work with TFramedTransport which means that the Thrift servers with asynchronous support will fail with these transports.
+ * As such, we want to ensure that any benchmarks against "unsecure" Accumulo use the same type of Thrift server.
+ */
+public enum ThriftServerType {
+ CUSTOM_HS_HA("custom_hs_ha"), THREADPOOL("threadpool"), SSL("ssl"), SASL("sasl");
+
+ private final String name;
+
+ private ThriftServerType(String name) {
+ this.name = name;
+ }
+
+ public static ThriftServerType get(String name) {
+ // Our custom HsHa server is the default (if none is provided)
+ if (StringUtils.isBlank(name)) {
+ return CUSTOM_HS_HA;
+ }
+ return ThriftServerType.valueOf(name.trim().toUpperCase());
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 5fe57b7..7adb46e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -51,6 +51,7 @@ import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.security.handler.Authenticator;
import org.apache.accumulo.server.security.handler.Authorizor;
+import org.apache.accumulo.server.security.handler.KerberosAuthenticator;
import org.apache.accumulo.server.security.handler.PermissionHandler;
import org.apache.accumulo.server.security.handler.ZKAuthenticator;
import org.apache.accumulo.server.security.handler.ZKAuthorizor;
@@ -68,6 +69,7 @@ public class SecurityOperation {
protected Authorizor authorizor;
protected Authenticator authenticator;
protected PermissionHandler permHandle;
+ protected boolean isKerberos;
private static String rootUserName = null;
private final ZooCache zooCache;
private final String ZKUserPath;
@@ -126,11 +128,11 @@ public class SecurityOperation {
|| !permHandle.validSecurityHandlers(authent, author))
throw new RuntimeException(authorizor + ", " + authenticator + ", and " + pm
+ " do not play nice with eachother. Please choose authentication and authorization mechanisms that are compatible with one another.");
+
+ isKerberos = KerberosAuthenticator.class.isAssignableFrom(authenticator.getClass());
}
public void initializeSecurity(TCredentials credentials, String rootPrincipal, byte[] token) throws AccumuloSecurityException, ThriftSecurityException {
- authenticate(credentials);
-
if (!isSystemUser(credentials))
throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
@@ -160,11 +162,31 @@ public class SecurityOperation {
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.INVALID_INSTANCEID);
Credentials creds = Credentials.fromThrift(credentials);
+
if (isSystemUser(credentials)) {
if (!(context.getCredentials().equals(creds))) {
+ log.debug("Provided credentials did not match server's expected credentials. Expected " + context.getCredentials() + " but got " + creds);
throw new ThriftSecurityException(creds.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS);
}
} else {
+ // Not the system user
+
+ if (isKerberos) {
+ // If we have kerberos credentials for a user from the network but no account
+ // in the system, we need to make one before proceeding
+ try {
+ if (!authenticator.userExists(creds.getPrincipal())) {
+ // If we call the normal createUser method, it will loop back into this method
+ // when it tries to check if the user has permission to create users
+ _createUser(credentials, creds, Authorizations.EMPTY);
+ }
+ } catch (AccumuloSecurityException e) {
+ log.debug("Failed to determine if user exists", e);
+ throw e.asThriftException();
+ }
+ }
+
+ // Check that the user is authenticated (a no-op at this point for kerberos)
try {
if (!authenticator.authenticateUser(creds.getPrincipal(), creds.getToken())) {
throw new ThriftSecurityException(creds.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS);
@@ -190,6 +212,15 @@ public class SecurityOperation {
return true;
try {
Credentials toCreds = Credentials.fromThrift(toAuth);
+
+ if (isKerberos) {
+ // If we have kerberos credentials for a user from the network but no account
+ // in the system, we need to make one before proceeding
+ if (!authenticator.userExists(toCreds.getPrincipal())) {
+ createUser(credentials, toCreds, Authorizations.EMPTY);
+ }
+ }
+
return authenticator.authenticateUser(toCreds.getPrincipal(), toCreds.getToken());
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
@@ -579,14 +610,23 @@ public class SecurityOperation {
public void createUser(TCredentials credentials, Credentials newUser, Authorizations authorizations) throws ThriftSecurityException {
if (!canCreateUser(credentials, newUser.getPrincipal()))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ _createUser(credentials, newUser, authorizations);
+ if (canChangeAuthorizations(credentials, newUser.getPrincipal())) {
+ try {
+ authorizor.changeAuthorizations(newUser.getPrincipal(), authorizations);
+ } catch (AccumuloSecurityException ase) {
+ throw ase.asThriftException();
+ }
+ }
+ }
+
+ protected void _createUser(TCredentials credentials, Credentials newUser, Authorizations authorizations) throws ThriftSecurityException {
try {
AuthenticationToken token = newUser.getToken();
authenticator.createUser(newUser.getPrincipal(), token);
authorizor.initUser(newUser.getPrincipal());
permHandle.initUser(newUser.getPrincipal());
log.info("Created user " + newUser.getPrincipal() + " at the request of user " + credentials.getPrincipal());
- if (canChangeAuthorizations(credentials, newUser.getPrincipal()))
- authorizor.changeAuthorizations(newUser.getPrincipal(), authorizations);
} catch (AccumuloSecurityException ase) {
throw ase.asThriftException();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
index 42d1313..6014139 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
@@ -69,10 +69,11 @@ public class SecurityUtil {
*/
public static boolean login(String principalConfig, String keyTabPath) {
try {
- String principalName = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principalConfig, InetAddress.getLocalHost().getCanonicalHostName());
+ String principalName = getServerPrincipal(principalConfig);
if (keyTabPath != null && principalName != null && keyTabPath.length() != 0 && principalName.length() != 0) {
+ log.info("Attempting to login with keytab as " + principalName);
UserGroupInformation.loginUserFromKeytab(principalName, keyTabPath);
- log.info("Succesfully logged in as user " + principalConfig);
+ log.info("Succesfully logged in as user " + principalName);
return true;
}
} catch (IOException io) {
@@ -80,4 +81,15 @@ public class SecurityUtil {
}
return false;
}
+
+ /**
+ * {@link org.apache.hadoop.security.SecurityUtil#getServerPrincipal(String, String)}
+ */
+ public static String getServerPrincipal(String configuredPrincipal) {
+ try {
+ return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(configuredPrincipal, InetAddress.getLocalHost().getCanonicalHostName());
+ } catch (IOException e) {
+ throw new RuntimeException("Could not convert configured server principal: " + configuredPrincipal, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
index 79201b1..51d50a1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
@@ -30,8 +30,10 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.util.Base64;
@@ -51,8 +53,8 @@ public final class SystemCredentials extends Credentials {
private final TCredentials AS_THRIFT;
- SystemCredentials(Instance instance) {
- super(SYSTEM_PRINCIPAL, SystemToken.get(instance));
+ SystemCredentials(Instance instance, String principal, AuthenticationToken token) {
+ super(principal, token);
AS_THRIFT = super.toThrift(instance);
}
@@ -65,7 +67,16 @@ public final class SystemCredentials extends Credentials {
public static SystemCredentials get(Instance instance) {
check_permission();
- return new SystemCredentials(instance);
+ String principal = SYSTEM_PRINCIPAL;
+ AccumuloConfiguration conf = SiteConfiguration.getInstance();
+ SaslConnectionParams saslParams = SaslConnectionParams.forConfig(conf);
+ if (null != saslParams) {
+ // Use the server's kerberos principal as the Accumulo principal. We could also unwrap the principal server-side, but the principal for SystemCredentials
+ // isnt' actually used anywhere, so it really doesn't matter. We can't include the kerberos principal in the SystemToken as it would break equality when
+ // different Accumulo servers are using different kerberos principals are their accumulo principal
+ principal = SecurityUtil.getServerPrincipal(conf.get(Property.GENERAL_KERBEROS_PRINCIPAL));
+ }
+ return new SystemCredentials(instance, principal, SystemToken.get(instance));
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
new file mode 100644
index 0000000..61b8db0
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.Base64;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
+import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class KerberosAuthenticator implements Authenticator {
+ private static final Logger log = LoggerFactory.getLogger(KerberosAuthenticator.class);
+
+ private static final Set<Class<? extends AuthenticationToken>> SUPPORTED_TOKENS = Sets.newHashSet(Arrays.<Class<? extends AuthenticationToken>> asList(
+ KerberosToken.class, SystemToken.class));
+ private static final Set<String> SUPPORTED_TOKEN_NAMES = Sets.newHashSet(KerberosToken.class.getName(), SystemToken.class.getName());
+
+ private final ZKAuthenticator zkAuthenticator = new ZKAuthenticator();
+ private String zkUserPath;
+ private final ZooCache zooCache;
+
+ public KerberosAuthenticator() {
+ zooCache = new ZooCache();
+ }
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ zkAuthenticator.initialize(instanceId, initialize);
+ zkUserPath = Constants.ZROOT + "/" + instanceId + "/users";
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authorizor auth, PermissionHandler pm) {
+ return true;
+ }
+
+ private void createUserNodeInZk(String principal) throws KeeperException, InterruptedException {
+ synchronized (zooCache) {
+ zooCache.clear();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], NodeExistsPolicy.FAIL);
+ }
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials credentials, String principal, byte[] token) throws AccumuloSecurityException, ThriftSecurityException {
+ try {
+ // remove old settings from zookeeper first, if any
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ synchronized (zooCache) {
+ zooCache.clear();
+ if (zoo.exists(zkUserPath)) {
+ zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP);
+ log.info("Removed " + zkUserPath + "/" + " from zookeeper");
+ }
+
+ principal = Base64.encodeBase64String(principal.getBytes(UTF_8));
+
+ // prep parent node of users with root username
+ zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL);
+
+ createUserNodeInZk(principal);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ log.error("Failed to initialize security", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean authenticateUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ final String rpcPrincipal = UGIAssumingProcessor.currentPrincipal();
+
+ if (!rpcPrincipal.equals(principal)) {
+ // KerberosAuthenticator can't do perform this because KerberosToken is just a shim and doesn't contain the actual credentials
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.AUTHENTICATOR_FAILED);
+ }
+
+ // User is authenticated at the transport layer -- nothing extra is necessary
+ if (token instanceof KerberosToken) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Set<String> listUsers() throws AccumuloSecurityException {
+ Set<String> base64Users = zkAuthenticator.listUsers();
+ Set<String> readableUsers = new HashSet<>();
+ for (String base64User : base64Users) {
+ readableUsers.add(new String(Base64.decodeBase64(base64User), UTF_8));
+ }
+ return readableUsers;
+ }
+
+ @Override
+ public synchronized void createUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ if (!(token instanceof KerberosToken)) {
+ throw new UnsupportedOperationException("Expected a KerberosToken but got a " + token.getClass().getSimpleName());
+ }
+
+ principal = Base64.encodeBase64String(principal.getBytes(UTF_8));
+
+ try {
+ createUserNodeInZk(principal);
+ } catch (KeeperException e) {
+ if (e.code().equals(KeeperException.Code.NODEEXISTS)) {
+ log.error("User already exists in ZooKeeper", e);
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.USER_EXISTS, e);
+ }
+ log.error("Failed to create user in ZooKeeper", e);
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error("Interrupted trying to create node for user", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized void dropUser(String user) throws AccumuloSecurityException {
+ user = Base64.encodeBase64String(user.getBytes(UTF_8));
+ zkAuthenticator.dropUser(user);
+ }
+
+ @Override
+ public void changePassword(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ throw new UnsupportedOperationException("Cannot change password with Kerberos authenticaton");
+ }
+
+ @Override
+ public synchronized boolean userExists(String user) throws AccumuloSecurityException {
+ user = Base64.encodeBase64String(user.getBytes(UTF_8));
+ return zkAuthenticator.userExists(user);
+ }
+
+ @Override
+ public Set<Class<? extends AuthenticationToken>> getSupportedTokenTypes() {
+ return SUPPORTED_TOKENS;
+ }
+
+ @Override
+ public boolean validTokenClass(String tokenClass) {
+ return SUPPORTED_TOKEN_NAMES.contains(tokenClass);
+ }
+
+}
[4/4] accumulo git commit: ACCUMULO-2815 Support for Kerberos client
authentication.
Posted by el...@apache.org.
ACCUMULO-2815 Support for Kerberos client authentication.
Leverage SASL transport provided by Thrift which can speak GSSAPI,
which Kerberos implements. Introduced...
* An Accumulo KerberosToken which is an AuthenticationToken to
validate users.
* Custom thrift processor and invocation handler to ensure server
RPCs have a valid KRB identity and Accumulo authentication.
* Authenticator, Authorizor and PermissionHandler for kerberos
* New ClientConf variables to use SASL transport and pass KRB
server primary (from principal)
* Updated ClientOpts and Shell opts to transparently use a
KerberosToken when SASL is enabled (no extra client work).
* Ensure existing unit tests still function.
* Throw ThriftSecurityExceptions on bad authentication to ensure
proper client action is taken.
* Fall back to krb principal before local OS user
* Initialize accepts a "root" user and defaults to not prompting
for a password to that user acct w/ SASL enabled.
* Use properties specific to server primary and realm for
clients to connect to servers (required for SASL handshake).
* Basic KerberosIT testing basic functionality (MiniKdc)
* Introduction of useKrbForIT option to run AccumuloClusterITs
with Kerberos (not 100% coverage) (MiniKdc)
* Ensure system user doesn't get a "real" user acct.
* Ensure that start-all.sh and stop-all.sh don't require krb creds
* Add user manual documentation
* Use the full krb principal as the accumulo principal
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4f19aa1f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4f19aa1f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4f19aa1f
Branch: refs/heads/master
Commit: 4f19aa1f8a629ba76e9c7b517b3356ba21865ec9
Parents: 8dc68b9
Author: Josh Elser <el...@apache.org>
Authored: Tue Dec 9 00:03:05 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jan 15 11:47:59 2015 -0500
----------------------------------------------------------------------
README | 42 +--
.../apache/accumulo/core/cli/ClientOpts.java | 64 +++-
.../core/client/ClientConfiguration.java | 46 ++-
.../core/client/impl/ClientContext.java | 37 +-
.../core/client/impl/ConnectorImpl.java | 6 +-
.../accumulo/core/client/impl/MasterClient.java | 3 +-
.../core/client/impl/ThriftTransportKey.java | 39 +-
.../core/client/impl/ThriftTransportPool.java | 4 +-
.../client/security/tokens/KerberosToken.java | 136 +++++++
.../org/apache/accumulo/core/conf/Property.java | 16 +-
.../accumulo/core/rpc/FilterTransport.java | 105 ++++++
.../accumulo/core/rpc/SaslConnectionParams.java | 244 +++++++++++++
.../apache/accumulo/core/rpc/ThriftUtil.java | 81 ++++-
.../accumulo/core/rpc/UGIAssumingTransport.java | 70 ++++
.../core/rpc/UGIAssumingTransportFactory.java | 55 +++
.../accumulo/core/cli/TestClientOpts.java | 114 +++++-
.../core/client/ClientConfigurationTest.java | 81 +++++
.../client/impl/ThriftTransportKeyTest.java | 84 +++++
.../core/conf/ClientConfigurationTest.java | 66 ----
.../core/rpc/SaslConnectionParamsTest.java | 103 ++++++
.../main/asciidoc/accumulo_user_manual.asciidoc | 2 +
docs/src/main/asciidoc/chapters/clients.txt | 11 +
docs/src/main/asciidoc/chapters/kerberos.txt | 355 +++++++++++++++++++
.../impl/MiniAccumuloClusterImpl.java | 17 +-
.../impl/MiniAccumuloConfigImpl.java | 20 ++
pom.xml | 24 +-
.../java/org/apache/accumulo/proxy/Proxy.java | 3 +-
.../accumulo/server/AccumuloServerContext.java | 66 ++++
.../apache/accumulo/server/init/Initialize.java | 102 +++++-
.../TCredentialsUpdatingInvocationHandler.java | 133 +++++++
.../server/rpc/TCredentialsUpdatingWrapper.java | 38 ++
.../accumulo/server/rpc/TServerUtils.java | 176 ++++++++-
.../accumulo/server/rpc/ThriftServerType.java | 49 +++
.../server/security/SecurityOperation.java | 48 ++-
.../accumulo/server/security/SecurityUtil.java | 16 +-
.../server/security/SystemCredentials.java | 17 +-
.../security/handler/KerberosAuthenticator.java | 181 ++++++++++
.../security/handler/KerberosAuthorizor.java | 90 +++++
.../handler/KerberosPermissionHandler.java | 154 ++++++++
.../server/thrift/UGIAssumingProcessor.java | 90 +++++
.../org/apache/accumulo/server/util/Admin.java | 9 +
.../org/apache/accumulo/server/util/ZooZap.java | 10 +
.../server/AccumuloServerContextTest.java | 102 ++++++
...redentialsUpdatingInvocationHandlerTest.java | 93 +++++
.../server/rpc/ThriftServerTypeTest.java | 36 ++
.../accumulo/gc/SimpleGarbageCollector.java | 15 +-
.../gc/GarbageCollectWriteAheadLogsTest.java | 32 +-
.../accumulo/gc/SimpleGarbageCollectorTest.java | 72 ++--
.../CloseWriteAheadLogReferencesTest.java | 30 +-
.../java/org/apache/accumulo/master/Master.java | 11 +-
.../accumulo/master/tableOps/CompactRange.java | 5 +
.../accumulo/monitor/servlets/trace/Basic.java | 5 +
.../org/apache/accumulo/tracer/TraceServer.java | 8 +-
.../apache/accumulo/tserver/TabletServer.java | 18 +-
.../tserver/replication/ReplicationWorker.java | 13 +-
.../java/org/apache/accumulo/shell/Shell.java | 33 +-
.../apache/accumulo/shell/ShellOptionsJC.java | 41 ++-
.../accumulo/shell/ShellOptionsJCTest.java | 51 +++
test/pom.xml | 17 +
.../accumulo/test/functional/ZombieTServer.java | 5 +-
.../test/performance/thrift/NullTserver.java | 5 +-
.../accumulo/harness/AccumuloClusterIT.java | 62 +++-
.../accumulo/harness/MiniClusterHarness.java | 106 +++++-
.../accumulo/harness/SharedMiniClusterIT.java | 45 ++-
.../org/apache/accumulo/harness/TestingKdc.java | 165 +++++++++
.../conf/AccumuloMiniClusterConfiguration.java | 62 +++-
.../server/security/SystemCredentialsIT.java | 76 +++-
.../test/ArbitraryTablePropertiesIT.java | 8 +
.../org/apache/accumulo/test/CleanWalIT.java | 10 +-
.../test/functional/BatchScanSplitIT.java | 3 +-
.../accumulo/test/functional/KerberosIT.java | 316 +++++++++++++++++
.../accumulo/test/functional/MetadataIT.java | 7 +
.../test/security/KerberosTokenTest.java | 108 ++++++
test/src/test/resources/log4j.properties | 9 +
74 files changed, 4290 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/README
----------------------------------------------------------------------
diff --git a/README b/README
index 27b3c66..4ebb078 100644
--- a/README
+++ b/README
@@ -364,45 +364,9 @@ certain column.
row1 colf1:colq2 [] val2
-If you are running on top of hdfs with kerberos enabled, then you need to do
-some extra work. First, create an Accumulo principal
-
- kadmin.local -q "addprinc -randkey accumulo/<host.domain.name>"
-
-where <host.domain.name> is replaced by a fully qualified domain name. Export
-the principals to a keytab file. It is safer to create a unique keytab file for each
-server, but you can also glob them if you wish.
-
- kadmin.local -q "xst -k accumulo.keytab -glob accumulo*"
-
-Place this file in $ACCUMULO_CONF_DIR for every host. It should be owned by
-the accumulo user and chmodded to 400. Add the following to the accumulo-env.sh
-
- kinit -kt $ACCUMULO_HOME/conf/accumulo.keytab accumulo/`hostname -f`
-
-In the accumulo-site.xml file on each node, add settings for general.kerberos.keytab
-and general.kerberos.principal, where the keytab setting is the absolute path
-to the keytab file ($ACCUMULO_HOME is valid to use) and principal is set to
-accumulo/_HOST@<REALM>, where REALM is set to your kerberos realm. You may use
-_HOST in lieu of your individual host names.
-
- <property>
- <name>general.kerberos.keytab</name>
- <value>$ACCUMULO_CONF_DIR/accumulo.keytab</value>
- </property>
-
- <property>
- <name>general.kerberos.principal</name>
- <value>accumulo/_HOST@MYREALM</value>
- </property>
-
-You can then start up Accumulo as you would with the accumulo user, and it will
-automatically handle the kerberos keys needed to access hdfs.
-
-Please Note: You may have issues initializing Accumulo while running kerberos HDFS.
-You can resolve this by temporarily granting the accumulo user write access to the
-hdfs root directory, running init, and then revoking write permission in the root
-directory (be sure to maintain access to the /accumulo directory).
+For information on how to configure Accumulo for on top of Secure HDFS with
+Kerberos, please consult the Accumulo user manual section specifically devoted
+to client and server configuration with Kerberos.
******************************************************************************
6. Monitoring Apache Accumulo
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
index eb020eb..f1a0393 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -105,7 +106,7 @@ public class ClientOpts extends Help {
}
@Parameter(names = {"-u", "--user"}, description = "Connection user")
- public String principal = System.getProperty("user.name");
+ public String principal = null;
@Parameter(names = "-p", converter = PasswordConverter.class, description = "Connection password")
public Password password = null;
@@ -114,17 +115,19 @@ public class ClientOpts extends Help {
public Password securePassword = null;
@Parameter(names = {"-tc", "--tokenClass"}, description = "Token class")
- public String tokenClassName = PasswordToken.class.getName();
+ public String tokenClassName = null;
@DynamicParameter(names = "-l",
description = "login properties in the format key=value. Reuse -l for each property (prompt for properties if this option is missing")
public Map<String,String> loginProps = new LinkedHashMap<String,String>();
public AuthenticationToken getToken() {
- if (!loginProps.isEmpty()) {
- Properties props = new Properties();
- for (Entry<String,String> loginOption : loginProps.entrySet())
- props.put(loginOption.getKey(), loginOption.getValue());
+ if (null != tokenClassName) {
+ final Properties props = new Properties();
+ if (!loginProps.isEmpty()) {
+ for (Entry<String,String> loginOption : loginProps.entrySet())
+ props.put(loginOption.getKey(), loginOption.getValue());
+ }
try {
AuthenticationToken token = Class.forName(tokenClassName).asSubclass(AuthenticationToken.class).newInstance();
@@ -166,6 +169,9 @@ public class ClientOpts extends Help {
@Parameter(names = "--ssl", description = "Connect to accumulo over SSL")
public boolean sslEnabled = false;
+ @Parameter(names = "--sasl", description = "Connecto to Accumulo using SASL (supports Kerberos)")
+ public boolean saslEnabled = false;
+
@Parameter(names = "--config-file", description = "Read the given client config file. "
+ "If omitted, the path searched can be specified with $ACCUMULO_CLIENT_CONF_PATH, "
+ "which defaults to ~/.accumulo/config:$ACCUMULO_CONF_DIR/client.conf:/etc/accumulo/client.conf")
@@ -189,11 +195,32 @@ public class ClientOpts extends Help {
Trace.off();
}
+ /**
+ * Automatically update the options to use a KerberosToken when SASL is enabled for RPCs. Don't overwrite the options if the user has provided something
+ * specifically.
+ */
+ protected void updateKerberosCredentials() {
+ ClientConfiguration clientConfig;
+ try {
+ if (clientConfigFile == null)
+ clientConfig = ClientConfiguration.loadDefault();
+ else
+ clientConfig = new ClientConfiguration(new PropertiesConfiguration(clientConfigFile));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ final boolean clientConfSaslEnabled = Boolean.parseBoolean(clientConfig.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ if ((saslEnabled || clientConfSaslEnabled) && null == tokenClassName) {
+ tokenClassName = KerberosToken.CLASS_NAME;
+ }
+ }
+
@Override
public void parseArgs(String programName, String[] args, Object... others) {
super.parseArgs(programName, args, others);
startDebugLogging();
startTracing(programName);
+ updateKerberosCredentials();
}
protected Instance cachedInstance = null;
@@ -207,10 +234,25 @@ public class ClientOpts extends Help {
return cachedInstance = new ZooKeeperInstance(this.getClientConfiguration());
}
+ public String getPrincipal() throws AccumuloSecurityException {
+ if (null == principal) {
+ AuthenticationToken token = getToken();
+ if (null == token) {
+ throw new AccumuloSecurityException("No principal or authentication token was provided", SecurityErrorCode.BAD_CREDENTIALS);
+ }
+
+ // Try to extract the principal automatically from Kerberos
+ if (token instanceof KerberosToken) {
+ principal = ((KerberosToken) token).getPrincipal();
+ } else {
+ principal = System.getProperty("user.name");
+ }
+ }
+ return principal;
+ }
+
public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
- if (this.principal == null || this.getToken() == null)
- throw new AccumuloSecurityException("You must provide a user (-u) and password (-p)", SecurityErrorCode.BAD_CREDENTIALS);
- return getInstance().getConnector(principal, getToken());
+ return getInstance().getConnector(getPrincipal(), getToken());
}
public ClientConfiguration getClientConfiguration() throws IllegalArgumentException {
@@ -228,6 +270,10 @@ public class ClientOpts extends Help {
}
if (sslEnabled)
clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SSL_ENABLED, "true");
+
+ if (saslEnabled)
+ clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SASL_ENABLED, "true");
+
if (siteFile != null) {
AccumuloConfiguration config = new AccumuloConfiguration() {
Configuration xml = new Configuration();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
index df53645..d37d471 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
@@ -47,6 +47,7 @@ public class ClientConfiguration extends CompositeConfiguration {
public static final String GLOBAL_CONF_FILENAME = "client.conf";
public enum ClientProperty {
+ // SSL
RPC_SSL_TRUSTSTORE_PATH(Property.RPC_SSL_TRUSTSTORE_PATH),
RPC_SSL_TRUSTSTORE_PASSWORD(Property.RPC_SSL_TRUSTSTORE_PASSWORD),
RPC_SSL_TRUSTSTORE_TYPE(Property.RPC_SSL_TRUSTSTORE_TYPE),
@@ -57,13 +58,34 @@ public class ClientConfiguration extends CompositeConfiguration {
GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS),
INSTANCE_RPC_SSL_CLIENT_AUTH(Property.INSTANCE_RPC_SSL_CLIENT_AUTH),
INSTANCE_RPC_SSL_ENABLED(Property.INSTANCE_RPC_SSL_ENABLED),
+
+ // ZooKeeper
INSTANCE_ZK_HOST(Property.INSTANCE_ZK_HOST),
INSTANCE_ZK_TIMEOUT(Property.INSTANCE_ZK_TIMEOUT),
+
+ // Instance information
INSTANCE_NAME("instance.name", null, PropertyType.STRING, "Name of Accumulo instance to connect to"),
INSTANCE_ID("instance.id", null, PropertyType.STRING, "UUID of Accumulo instance to connect to"),
+
+ // Tracing
TRACE_SPAN_RECEIVERS(Property.TRACE_SPAN_RECEIVERS),
TRACE_SPAN_RECEIVER_PREFIX(Property.TRACE_SPAN_RECEIVER_PREFIX),
- TRACE_ZK_PATH(Property.TRACE_ZK_PATH);
+ TRACE_ZK_PATH(Property.TRACE_ZK_PATH),
+
+ // SASL / GSSAPI(Kerberos)
+ /**
+ * @since 1.7.0
+ */
+ INSTANCE_RPC_SASL_ENABLED(Property.INSTANCE_RPC_SASL_ENABLED),
+ /**
+ * @since 1.7.0
+ */
+ RPC_SASL_QOP(Property.RPC_SASL_QOP),
+ /**
+ * @since 1.7.0
+ */
+ KERBEROS_SERVER_PRIMARY("kerberos.server.primary", "accumulo", PropertyType.STRING,
+ "The first component of the Kerberos principal, the 'primary', that Accumulo servers use to login");
private String key;
private String defaultValue;
@@ -356,4 +378,26 @@ public class ClientConfiguration extends CompositeConfiguration {
setProperty(ClientProperty.RPC_SSL_KEYSTORE_TYPE, type);
return this;
}
+
+ /**
+ * Same as {@link #with(ClientProperty, String)} for ClientProperty.INSTANCE_RPC_SASL_ENABLED.
+ *
+ * @since 1.7.0
+ */
+ public ClientConfiguration withSasl(boolean saslEnabled) {
+ return with(ClientProperty.INSTANCE_RPC_SASL_ENABLED, String.valueOf(saslEnabled));
+ }
+
+ /**
+ * Same as {@link #with(ClientProperty, String)} for ClientProperty.INSTANCE_RPC_SASL_ENABLED and ClientProperty.GENERAL_KERBEROS_PRINCIPAL.
+ *
+ * @param saslEnabled
+ * Should SASL(kerberos) be enabled
+ * @param kerberosServerPrimary
+ * The 'primary' component of the Kerberos principal Accumulo servers use to login (e.g. 'accumulo' in 'accumulo/_HOST@REALM')
+ * @since 1.7.0
+ */
+ public ClientConfiguration withSasl(boolean saslEnabled, String kerberosServerPrimary) {
+ return withSasl(saslEnabled).with(ClientProperty.KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
index e75bec6..8470da4 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
@@ -33,6 +34,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.CredentialProviderFactoryShim;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.security.thrift.TCredentials;
@@ -52,6 +54,7 @@ public class ClientContext {
private final Instance inst;
private Credentials creds;
+ private ClientConfiguration clientConf;
private final AccumuloConfiguration rpcConf;
private Connector conn;
@@ -60,6 +63,7 @@ public class ClientContext {
*/
public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf) {
this(instance, credentials, convertClientConfig(checkNotNull(clientConf, "clientConf is null")));
+ this.clientConf = clientConf;
}
/**
@@ -69,6 +73,7 @@ public class ClientContext {
inst = checkNotNull(instance, "instance is null");
creds = checkNotNull(credentials, "credentials is null");
rpcConf = checkNotNull(serverConf, "serverConf is null");
+ clientConf = null;
}
/**
@@ -115,6 +120,17 @@ public class ClientContext {
}
/**
+ * Retrieve SASL configuration to initiate an RPC connection to a server
+ */
+ public SaslConnectionParams getClientSaslParams() {
+ // Use the clientConf if we have it
+ if (null != clientConf) {
+ return SaslConnectionParams.forConfig(clientConf);
+ }
+ return SaslConnectionParams.forConfig(getConfiguration());
+ }
+
+ /**
* Retrieve a connector
*/
public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
@@ -171,10 +187,19 @@ public class ClientContext {
}
}
}
+
if (config.containsKey(key))
return config.getString(key);
- else
+ else {
+ // Reconstitute the server kerberos property from the client config
+ if (Property.GENERAL_KERBEROS_PRINCIPAL == property) {
+ if (config.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) {
+ // Avoid providing a realm since we don't know what it is...
+ return config.getString(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()) + "/_HOST@" + SaslConnectionParams.getDefaultRealm();
+ }
+ }
return defaults.get(property);
+ }
}
@Override
@@ -188,6 +213,16 @@ public class ClientContext {
props.put(key, config.getString(key));
}
+ // Two client props that don't exist on the server config. Client doesn't need to know about the Kerberos instance from the principle, but servers do
+ // Automatically reconstruct the server property when converting a client config.
+ if (props.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) {
+ final String serverPrimary = props.remove(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey());
+ if (filter.accept(Property.GENERAL_KERBEROS_PRINCIPAL.getKey())) {
+ // Use the _HOST expansion. It should be unnecessary in "client land".
+ props.put(Property.GENERAL_KERBEROS_PRINCIPAL.getKey(), serverPrimary + "/_HOST@" + SaslConnectionParams.getDefaultRealm());
+ }
+ }
+
// Attempt to load sensitive properties from a CredentialProvider, if configured
org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration();
if (null != hadoopConf) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index f481cc3..443e548 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Tracer;
public class ConnectorImpl extends Connector {
+ private static final String SYSTEM_TOKEN_NAME = "org.apache.accumulo.server.security.SystemCredentials$SystemToken";
private final ClientContext context;
private SecurityOperations secops = null;
private TableOperationsImpl tableops = null;
@@ -60,8 +61,9 @@ public class ConnectorImpl extends Connector {
this.context = context;
- // Skip fail fast for system services; string literal for class name, to avoid
- if (!"org.apache.accumulo.server.security.SystemCredentials$SystemToken".equals(context.getCredentials().getToken().getClass().getName())) {
+ // Skip fail fast for system services; string literal for class name, to avoid dependency on server jar
+ final String tokenClassName = context.getCredentials().getToken().getClass().getName();
+ if (!SYSTEM_TOKEN_NAME.equals(tokenClassName)) {
ServerClient.execute(context, new ClientExec<ClientService.Client>() {
@Override
public void execute(ClientService.Client iface) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index fcbf9f9..9dad794 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@ -68,7 +68,8 @@ public class MasterClient {
MasterClientService.Client client = ThriftUtil.getClientNoTimeout(new MasterClientService.Client.Factory(), master, context);
return client;
} catch (TTransportException tte) {
- if (tte.getCause().getClass().equals(UnknownHostException.class)) {
+ Throwable cause = tte.getCause();
+ if (null != cause && cause instanceof UnknownHostException) {
// do not expect to recover from this
throw new RuntimeException(tte);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
index 6dc846f..072724b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import com.google.common.net.HostAndPort;
@@ -26,6 +27,7 @@ class ThriftTransportKey {
private final HostAndPort server;
private final long timeout;
private final SslConnectionParams sslParams;
+ private final SaslConnectionParams saslParams;
private int hash = -1;
@@ -34,6 +36,24 @@ class ThriftTransportKey {
this.server = server;
this.timeout = timeout;
this.sslParams = context.getClientSslParams();
+ this.saslParams = context.getClientSaslParams();
+ if (null != saslParams) {
+ // TSasl and TSSL transport factories don't play nicely together
+ if (null != sslParams) {
+ throw new RuntimeException("Cannot use both SSL and SASL thrift transports");
+ }
+ }
+ }
+
+ /**
+ * Visible only for testing
+ */
+ ThriftTransportKey(HostAndPort server, long timeout, SslConnectionParams sslParams, SaslConnectionParams saslParams) {
+ checkNotNull(server, "location is null");
+ this.server = server;
+ this.timeout = timeout;
+ this.sslParams = sslParams;
+ this.saslParams = saslParams;
}
HostAndPort getServer() {
@@ -48,12 +68,17 @@ class ThriftTransportKey {
return sslParams != null;
}
+ public boolean isSasl() {
+ return saslParams != null;
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ThriftTransportKey))
return false;
ThriftTransportKey ttk = (ThriftTransportKey) o;
- return server.equals(ttk.server) && timeout == ttk.timeout && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams)));
+ return server.equals(ttk.server) && timeout == ttk.timeout && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams)))
+ && (!isSasl() || (ttk.isSasl() && saslParams.equals(ttk.saslParams)));
}
@Override
@@ -65,10 +90,20 @@ class ThriftTransportKey {
@Override
public String toString() {
- return (isSsl() ? "ssl:" : "") + server + " (" + Long.toString(timeout) + ")";
+ String prefix = "";
+ if (isSsl()) {
+ prefix = "ssl:";
+ } else if (isSasl()) {
+ prefix = "sasl:" + saslParams.getPrincipal() + "@";
+ }
+ return prefix + server + " (" + Long.toString(timeout) + ")";
}
public SslConnectionParams getSslParams() {
return sslParams;
}
+
+ public SaslConnectionParams getSaslParams() {
+ return saslParams;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 5da803b..bc1cdbb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -465,7 +465,7 @@ public class ThriftTransportPool {
try {
return new Pair<String,TTransport>(ttk.getServer().toString(), createNewTransport(ttk));
} catch (TTransportException tte) {
- log.debug("Failed to connect to " + servers.get(index), tte);
+ log.debug("Failed to connect to {}", servers.get(index), tte);
servers.remove(index);
retryCount++;
}
@@ -475,7 +475,7 @@ public class ThriftTransportPool {
}
private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
- TTransport transport = ThriftUtil.createClientTransport(cacheKey.getServer(), (int) cacheKey.getTimeout(), cacheKey.getSslParams());
+ TTransport transport = ThriftUtil.createClientTransport(cacheKey.getServer(), (int) cacheKey.getTimeout(), cacheKey.getSslParams(), cacheKey.getSaslParams());
log.trace("Creating new connection to connection to {}", cacheKey.getServer());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
new file mode 100644
index 0000000..d7d2e15
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.security.tokens;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import javax.security.auth.DestroyFailedException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Authentication token for Kerberos authenticated clients
+ *
+ * @since 1.7.0
+ */
+public class KerberosToken implements AuthenticationToken {
+
+ public static final String CLASS_NAME = KerberosToken.class.getName();
+
+ private static final int VERSION = 1;
+
+ private String principal;
+
+ /**
+ * Creates a token using the provided principal and the currently logged-in user via {@link UserGroupInformation}.
+ *
+ * @param principal
+ * The user that is logged in
+ */
+ public KerberosToken(String principal) throws IOException {
+ Preconditions.checkNotNull(principal);
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ Preconditions.checkArgument(ugi.hasKerberosCredentials(), "Subject is not logged in via Kerberos");
+ Preconditions.checkArgument(principal.equals(ugi.getUserName()), "Provided principal does not match currently logged-in user");
+ this.principal = ugi.getUserName();
+ }
+
+ /**
+ * Creates a token using the login user as returned by {@link UserGroupInformation#getCurrentUser()}
+ *
+ * @throws IOException
+ * If the current logged in user cannot be computed.
+ */
+ public KerberosToken() throws IOException {
+ this(UserGroupInformation.getCurrentUser().getUserName());
+ }
+
+ @Override
+ public KerberosToken clone() {
+ try {
+ return new KerberosToken(principal);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof KerberosToken))
+ return false;
+ KerberosToken other = (KerberosToken) obj;
+
+ return principal.equals(other.principal);
+ }
+
+ /**
+ * The identity of the user to which this token belongs to according to Kerberos
+ *
+ * @return The principal
+ */
+ public String getPrincipal() {
+ return principal;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(VERSION);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int actualVersion = in.readInt();
+ if (VERSION != actualVersion) {
+ throw new IOException("Did not find expected version in serialized KerberosToken");
+ }
+ }
+
+ @Override
+ public synchronized void destroy() throws DestroyFailedException {
+ principal = null;
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return null == principal;
+ }
+
+ @Override
+ public void init(Properties properties) {
+
+ }
+
+ @Override
+ public Set<TokenProperty> getProperties() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public int hashCode() {
+ return principal.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ce5de85..ad96680 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -105,6 +105,11 @@ public enum Property {
// TLSv1.2 should be used as the default when JDK6 support is dropped
RPC_SSL_CLIENT_PROTOCOL("rpc.ssl.client.protocol", "TLSv1", PropertyType.STRING,
"The protocol used to connect to a secure server, must be in the list of enabled protocols on the server side (rpc.ssl.server.enabled.protocols)"),
+ /**
+ * @since 1.7.0
+ */
+ RPC_SASL_QOP("rpc.sasl.qop", "auth", PropertyType.STRING,
+ "The quality of protection to be used with SASL. Valid values are 'auth', 'auth-int', and 'auth-conf'"),
// instance properties (must be the same for every node in an instance)
INSTANCE_PREFIX("instance.", null, PropertyType.PREFIX,
@@ -145,8 +150,14 @@ public enum Property {
"The authorizor class that accumulo will use to determine what labels a user has privilege to see"),
INSTANCE_SECURITY_PERMISSION_HANDLER("instance.security.permissionHandler", "org.apache.accumulo.server.security.handler.ZKPermHandler",
PropertyType.CLASSNAME, "The permission handler class that accumulo will use to determine if a user has privilege to perform an action"),
- INSTANCE_RPC_SSL_ENABLED("instance.rpc.ssl.enabled", "false", PropertyType.BOOLEAN, "Use SSL for socket connections from clients and among accumulo services"),
+ INSTANCE_RPC_SSL_ENABLED("instance.rpc.ssl.enabled", "false", PropertyType.BOOLEAN,
+ "Use SSL for socket connections from clients and among accumulo services. Mutually exclusive with SASL RPC configuration."),
INSTANCE_RPC_SSL_CLIENT_AUTH("instance.rpc.ssl.clientAuth", "false", PropertyType.BOOLEAN, "Require clients to present certs signed by a trusted root"),
+ /**
+ * @since 1.7.0
+ */
+ INSTANCE_RPC_SASL_ENABLED("instance.rpc.sasl.enabled", "false", PropertyType.BOOLEAN,
+ "Configures Thrift RPCs to require SASL with GSSAPI which supports Kerberos authentication. Mutually exclusive with SSL RPC configuration."),
// general properties
GENERAL_PREFIX("general.", null, PropertyType.PREFIX,
@@ -158,6 +169,9 @@ public enum Property {
GENERAL_DYNAMIC_CLASSPATHS(AccumuloVFSClassLoader.DYNAMIC_CLASSPATH_PROPERTY_NAME, AccumuloVFSClassLoader.DEFAULT_DYNAMIC_CLASSPATH_VALUE,
PropertyType.STRING, "A list of all of the places where changes in jars or classes will force a reload of the classloader."),
GENERAL_RPC_TIMEOUT("general.rpc.timeout", "120s", PropertyType.TIMEDURATION, "Time to wait on I/O for simple, short RPC calls"),
+ @Experimental
+ GENERAL_RPC_SERVER_TYPE("general.rpc.server.type", "", PropertyType.STRING,
+ "Type of Thrift server to instantiate, see org.apache.accumulo.server.rpc.ThriftServerType for more information. Only useful for benchmarking thrift servers"),
GENERAL_KERBEROS_KEYTAB("general.kerberos.keytab", "", PropertyType.PATH, "Path to the kerberos keytab to use. Leave blank if not using kerberoized hdfs"),
GENERAL_KERBEROS_PRINCIPAL("general.kerberos.principal", "", PropertyType.STRING, "Name of the kerberos principal to use. _HOST will automatically be "
+ "replaced by the machines hostname in the hostname portion of the principal. Leave blank if not using kerberoized hdfs"),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java
new file mode 100644
index 0000000..a50944b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/FilterTransport.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.rpc;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Transport that simply wraps another transport. This is the equivalent of FilterInputStream for Thrift transports.
+ */
+public class FilterTransport extends TTransport {
+ private final TTransport wrapped;
+
+ public FilterTransport(TTransport wrapped) {
+ Preconditions.checkNotNull(wrapped);
+ this.wrapped = wrapped;
+ }
+
+ protected TTransport getWrapped() {
+ return wrapped;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ wrapped.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wrapped.isOpen();
+ }
+
+ @Override
+ public boolean peek() {
+ return wrapped.peek();
+ }
+
+ @Override
+ public void close() {
+ wrapped.close();
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.read(buf, off, len);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.readAll(buf, off, len);
+ }
+
+ @Override
+ public void write(byte[] buf) throws TTransportException {
+ wrapped.write(buf);
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ wrapped.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ wrapped.flush();
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return wrapped.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return wrapped.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return wrapped.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ wrapped.consumeBuffer(len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/SaslConnectionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/SaslConnectionParams.java b/core/src/main/java/org/apache/accumulo/core/rpc/SaslConnectionParams.java
new file mode 100644
index 0000000..e067e23
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/SaslConnectionParams.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection parameters for setting up a TSaslTransportFactory
+ */
+public class SaslConnectionParams {
+ private static final Logger log = LoggerFactory.getLogger(SaslConnectionParams.class);
+
+ /**
+ * Enumeration around {@link Sasl#QOP}
+ */
+ public enum QualityOfProtection {
+ AUTH("auth"),
+ AUTH_INT("auth-int"),
+ AUTH_CONF("auth-conf");
+
+ private final String quality;
+
+ private QualityOfProtection(String quality) {
+ this.quality = quality;
+ }
+
+ public String getQuality() {
+ return quality;
+ }
+
+ public static QualityOfProtection get(String name) {
+ if (AUTH.quality.equals(name)) {
+ return AUTH;
+ } else if (AUTH_INT.quality.equals(name)) {
+ return AUTH_INT;
+ } else if (AUTH_CONF.quality.equals(name)) {
+ return AUTH_CONF;
+ }
+
+ throw new IllegalArgumentException("No value for " + name);
+ }
+
+ @Override
+ public String toString() {
+ return quality;
+ }
+ }
+
+ private static String defaultRealm;
+
+ static {
+ try {
+ defaultRealm = KerberosUtil.getDefaultRealm();
+ } catch (Exception ke) {
+ log.debug("Kerberos krb5 configuration not found, setting default realm to empty");
+ defaultRealm = "UNKNOWN";
+ }
+ }
+
+ private String principal;
+ private QualityOfProtection qop;
+ private String kerberosServerPrimary;
+ private final Map<String,String> saslProperties;
+
+ private SaslConnectionParams() {
+ saslProperties = new HashMap<>();
+ }
+
+ /**
+ * Generate an {@link SaslConnectionParams} instance given the provided {@link AccumuloConfiguration}. The provided configuration is converted into a
+ * {@link ClientConfiguration}, ignoring any properties which are not {@link ClientProperty}s. If SASL is not being used, a null object will be returned.
+ * Callers should strive to use {@link #forConfig(ClientConfiguration)}; server processes are the only intended consumers of this method.
+ *
+ * @param conf
+ * The configuration for clients to communicate with Accumulo
+ * @return An {@link SaslConnectionParams} instance or null if SASL is not enabled
+ */
+ public static SaslConnectionParams forConfig(AccumuloConfiguration conf) {
+ final Map<String,String> clientProperties = new HashMap<>();
+
+ // Servers will only have the full principal in their configuration -- parse the
+ // primary and realm from it.
+ final String serverPrincipal = conf.get(Property.GENERAL_KERBEROS_PRINCIPAL);
+
+ final KerberosName krbName;
+ try {
+ krbName = new KerberosName(serverPrincipal);
+ clientProperties.put(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey(), krbName.getServiceName());
+ } catch (Exception e) {
+ // bad value or empty, assume we're not using kerberos
+ }
+
+ HashSet<String> clientKeys = new HashSet<>();
+ for (ClientProperty prop : ClientProperty.values()) {
+ clientKeys.add(prop.getKey());
+ }
+
+ String key;
+ for (Entry<String,String> entry : conf) {
+ key = entry.getKey();
+ if (clientKeys.contains(key)) {
+ clientProperties.put(key, entry.getValue());
+ }
+ }
+
+ ClientConfiguration clientConf = new ClientConfiguration(new MapConfiguration(clientProperties));
+ return forConfig(clientConf);
+ }
+
+ /**
+ * Generate an {@link SaslConnectionParams} instance given the provided {@link ClientConfiguration}. If SASL is not being used, a null object will be
+ * returned.
+ *
+ * @param conf
+ * The configuration for clients to communicate with Accumulo
+ * @return An {@link SaslConnectionParams} instance or null if SASL is not enabled
+ */
+ public static SaslConnectionParams forConfig(ClientConfiguration conf) {
+ if (!Boolean.parseBoolean(conf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED))) {
+ return null;
+ }
+
+ SaslConnectionParams params = new SaslConnectionParams();
+
+ // Ensure we're using Kerberos auth for Hadoop UGI
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ throw new RuntimeException("Cannot use SASL if Hadoop security is not enabled");
+ }
+
+ // Get the current user
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get current user", e);
+ }
+
+ // The full name is our principal
+ params.principal = currentUser.getUserName();
+ if (null == params.principal) {
+ throw new RuntimeException("Got null username from " + currentUser);
+ }
+
+ // Get the quality of protection to use
+ final String qopValue = conf.get(ClientProperty.RPC_SASL_QOP);
+ params.qop = QualityOfProtection.get(qopValue);
+
+ // Add in the SASL properties to a map so we don't have to repeatedly construct this map
+ params.saslProperties.put(Sasl.QOP, params.qop.getQuality());
+
+ // The primary from the KRB principal on each server (e.g. primary/instance@realm)
+ params.kerberosServerPrimary = conf.get(ClientProperty.KERBEROS_SERVER_PRIMARY);
+
+ return params;
+ }
+
+ public Map<String,String> getSaslProperties() {
+ return Collections.unmodifiableMap(saslProperties);
+ }
+ /**
+ * The quality of protection used with SASL. See {@link Sasl#QOP} for more information.
+ */
+ public QualityOfProtection getQualityOfProtection() {
+ return qop;
+ }
+
+ /**
+ * The 'primary' component from the Kerberos principals that servers are configured to use.
+ */
+ public String getKerberosServerPrimary() {
+ return kerberosServerPrimary;
+ }
+
+ /**
+ * The principal of the logged in user for SASL
+ */
+ public String getPrincipal() {
+ return principal;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder hcb = new HashCodeBuilder(23,29);
+ hcb.append(kerberosServerPrimary).append(saslProperties).append(qop.hashCode()).append(principal);
+ return hcb.toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof SaslConnectionParams) {
+ SaslConnectionParams other = (SaslConnectionParams) o;
+ if (!kerberosServerPrimary.equals(other.kerberosServerPrimary)) {
+ return false;
+ }
+ if (qop != other.qop) {
+ return false;
+ }
+ if (!principal.equals(other.principal)) {
+ return false;
+ }
+
+ return saslProperties.equals(other.saslProperties);
+ }
+
+ return false;
+ }
+
+ public static String getDefaultRealm() {
+ return defaultRealm;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 09bd6c4..d880fb3 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.rpc;
import java.io.FileInputStream;
import java.io.IOException;
+import java.net.InetAddress;
import java.security.KeyStore;
import java.util.HashMap;
import java.util.Map;
@@ -37,17 +38,20 @@ import org.apache.accumulo.core.client.impl.ThriftTransportPool;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.log4j.Logger;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.net.HostAndPort;
@@ -55,12 +59,14 @@ import com.google.common.net.HostAndPort;
* Factory methods for creating Thrift client objects
*/
public class ThriftUtil {
- private static final Logger log = Logger.getLogger(ThriftUtil.class);
+ private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);
private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory();
private static final TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
+ public static final String GSSAPI = "GSSAPI";
+
/**
* An instance of {@link TraceProtocolFactory}
*
@@ -246,7 +252,7 @@ public class ThriftUtil {
* RPC options
*/
public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException {
- return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams());
+ return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams(), context.getClientSaslParams());
}
/**
@@ -283,13 +289,23 @@ public class ThriftUtil {
* Client socket timeout
* @param sslParams
* RPC options for SSL servers
+ * @param saslParams
+ * RPC options for SASL servers
* @return An open TTransport which must be closed when finished
*/
- public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams) throws TTransportException {
+ public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams, SaslConnectionParams saslParams)
+ throws TTransportException {
boolean success = false;
TTransport transport = null;
try {
if (sslParams != null) {
+ // The check in AccumuloServerContext ensures that servers are brought up with sane configurations, but we also want to validate clients
+ if (null != saslParams) {
+ throw new IllegalStateException("Cannot use both SSL and SASL");
+ }
+
+ log.trace("Creating SSL client transport");
+
// TSSLTransportFactory handles timeout 0 -> forever natively
if (sslParams.useJsse()) {
transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout);
@@ -309,20 +325,59 @@ public class ThriftUtil {
// Create the TSocket from that
transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout);
+ // TSSLTransportFactory leaves transports open, so no need to open here
}
- // TSSLTransportFactory leaves transports open, so no need to open here
- } else if (timeout == 0) {
+
+ transport = ThriftUtil.transportFactory().getTransport(transport);
+ } else if (null != saslParams) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ throw new IllegalStateException("Expected Kerberos security to be enabled if SASL is in use");
+ }
+
+ log.trace("Creating SASL connection to {}:{}", address.getHostText(), address.getPort());
+
transport = new TSocket(address.getHostText(), address.getPort());
- transport.open();
- } else {
+
try {
- transport = TTimeoutTransport.create(address, timeout);
- } catch (IOException ex) {
- throw new TTransportException(ex);
+ // Log in via UGI, ensures we have logged in with our KRB credentials
+ final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ // Is this pricey enough that we want to cache it?
+ final String hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
+
+ log.trace("Opening transport to server as {} to {}/{}", currentUser, saslParams.getKerberosServerPrimary(), hostname);
+
+ // Create the client SASL transport using the information for the server
+ // Despite the 'protocol' argument seeming to be useless, it *must* be the primary of the server being connected to
+ transport = new TSaslClientTransport(GSSAPI, null, saslParams.getKerberosServerPrimary(), hostname, saslParams.getSaslProperties(), null, transport);
+
+ // Wrap it all in a processor which will run with a doAs the current user
+ transport = new UGIAssumingTransport(transport, currentUser);
+
+ // Open the transport
+ transport.open();
+ } catch (IOException e) {
+ log.warn("Failed to open SASL transport", e);
+ throw new TTransportException(e);
+ }
+ } else {
+ log.trace("Opening normal transport");
+ if (timeout == 0) {
+ transport = new TSocket(address.getHostText(), address.getPort());
+ transport.open();
+ } else {
+ try {
+ transport = TTimeoutTransport.create(address, timeout);
+ } catch (IOException ex) {
+ log.warn("Failed to open transport to " + address);
+ throw new TTransportException(ex);
+ }
+
+ // Open the transport
+ transport.open();
}
- transport.open();
+ transport = ThriftUtil.transportFactory().getTransport(transport);
}
- transport = ThriftUtil.transportFactory().getTransport(transport);
success = true;
} finally {
if (!success && transport != null) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransport.java
new file mode 100644
index 0000000..bc2c785
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransport.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient inside open(). So, we need to assume the correct UGI when the transport is
+ * opened so that the SASL mechanisms have access to the right principal. This transport wraps the Sasl transports to set up the right UGI context for open().
+ *
+ * This is used on the client side, where the API explicitly opens a transport to the server.
+ *
+ * Lifted from Apache Hive 0.14
+ */
+public class UGIAssumingTransport extends FilterTransport {
+ protected UserGroupInformation ugi;
+
+ public UGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+ super(wrapped);
+ this.ugi = ugi;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ final AtomicReference<TTransportException> holder = new AtomicReference<>(null);
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() {
+ try {
+ getWrapped().open();
+ } catch (TTransportException tte) {
+ holder.set(tte);
+ }
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Make sure the transport exception gets (re)thrown if it happened
+ TTransportException tte = holder.get();
+ if (null != tte) {
+ throw tte;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java
new file mode 100644
index 0000000..77a3ce3
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/UGIAssumingTransportFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.security.PrivilegedAction;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A TransportFactory that wraps another one, but assumes a specified UGI before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting clients.
+ *
+ * Borrowed from Apache Hive 0.14
+ */
+public class UGIAssumingTransportFactory extends TTransportFactory {
+ private final UserGroupInformation ugi;
+ private final TTransportFactory wrapped;
+
+ public UGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+ Preconditions.checkNotNull(wrapped);
+ Preconditions.checkNotNull(ugi);
+
+ this.wrapped = wrapped;
+ this.ugi = ugi;
+ }
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ return ugi.doAs(new PrivilegedAction<TTransport>() {
+ @Override
+ public TTransport run() {
+ return wrapped.getTransport(trans);
+ }
+ });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
index ff49bc0..435ae85 100644
--- a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
+++ b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
@@ -21,14 +21,22 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import javax.security.auth.DestroyFailedException;
+
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.security.Authorizations;
@@ -48,14 +56,14 @@ public class TestClientOpts {
public TestName testName = new TestName();
@Test
- public void test() {
+ public void test() throws Exception {
BatchWriterConfig cfg = new BatchWriterConfig();
// document the defaults
ClientOpts args = new ClientOpts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
BatchScannerOpts bsOpts = new BatchScannerOpts();
- assertEquals(System.getProperty("user.name"), args.principal);
+ assertNull(args.principal);
assertNull(args.securePassword);
assertNull(args.getToken());
assertEquals(Long.valueOf(cfg.getMaxLatency(TimeUnit.MILLISECONDS)), bwOpts.batchLatency);
@@ -146,4 +154,106 @@ public class TestClientOpts {
args.getInstance();
}
+ @Test
+ public void testSsl() {
+ ClientOpts args = new ClientOpts();
+
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("--ssl");
+ assertEquals(true, args.sslEnabled);
+ }
+
+ @Test
+ public void testSaslWithClientConfig() throws IOException {
+ ClientOpts args = new ClientOpts();
+
+ File clientConfFile = tmpDir.newFile();
+ FileWriter writer = new FileWriter(clientConfFile);
+
+ try {
+ writer.write(String.format("%s=%s\n", ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), "true"));
+ } finally {
+ writer.close();
+ }
+
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("--config-file", clientConfFile.getCanonicalPath());
+ args.updateKerberosCredentials();
+
+ assertEquals(KerberosToken.CLASS_NAME, args.tokenClassName);
+ }
+
+ @Test
+ public void testSasl() {
+ ClientOpts args = new ClientOpts();
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("--sasl");
+ assertEquals(true, args.saslEnabled);
+ }
+
+ @Test
+ public void testEmptyTokenProperties() {
+ ClientOpts args = new ClientOpts();
+
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("-tc", EmptyToken.class.getName());
+ assertEquals(new EmptyToken(), args.getToken());
+ }
+
+ @Test
+ public void testPrincipalWithSasl() throws IOException {
+ ClientOpts args = new ClientOpts();
+
+ File clientConfFile = tmpDir.newFile();
+
+ JCommander jc = new JCommander();
+ jc.addObject(args);
+ jc.parse("--config-file", clientConfFile.getCanonicalPath(), "--sasl", "-i", "instance_name");
+
+ ClientConfiguration clientConf = args.getClientConfiguration();
+ assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ }
+
+ /**
+ * An authentication token which requires no options
+ */
+ private static class EmptyToken implements AuthenticationToken {
+ public EmptyToken() {}
+
+ @Override
+ public void write(DataOutput out) throws IOException {}
+
+ @Override
+ public void readFields(DataInput in) throws IOException {}
+
+ @Override
+ public void destroy() throws DestroyFailedException {}
+
+ @Override
+ public boolean isDestroyed() {
+ return false;
+ }
+
+ @Override
+ public void init(Properties properties) {}
+
+ @Override
+ public Set<TokenProperty> getProperties() {
+ return null;
+ }
+
+ @Override
+ public AuthenticationToken clone() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof EmptyToken;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/client/ClientConfigurationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/ClientConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/client/ClientConfigurationTest.java
new file mode 100644
index 0000000..424cea1
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/ClientConfigurationTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.junit.Test;
+
+public class ClientConfigurationTest {
+
+ @Test
+ public void testOverrides() throws Exception {
+ ClientConfiguration clientConfig = createConfig();
+ assertExpectedConfig(clientConfig);
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+ ClientConfiguration clientConfig = createConfig();
+ // sanity check that we're starting with what we're expecting
+ assertExpectedConfig(clientConfig);
+
+ String serialized = clientConfig.serialize();
+ ClientConfiguration deserializedClientConfig = ClientConfiguration.deserialize(serialized);
+ assertExpectedConfig(deserializedClientConfig);
+ }
+
+ private void assertExpectedConfig(ClientConfiguration clientConfig) {
+ assertEquals("firstZkHosts", clientConfig.get(ClientProperty.INSTANCE_ZK_HOST));
+ assertEquals("secondInstanceName", clientConfig.get(ClientProperty.INSTANCE_NAME));
+ assertEquals("123s", clientConfig.get(ClientProperty.INSTANCE_ZK_TIMEOUT));
+ assertEquals(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE.getDefaultValue(), clientConfig.get(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE));
+ }
+
+ private ClientConfiguration createConfig() {
+ Configuration first = new PropertiesConfiguration();
+ first.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "firstZkHosts");
+ Configuration second = new PropertiesConfiguration();
+ second.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "secondZkHosts");
+ second.addProperty(ClientProperty.INSTANCE_NAME.getKey(), "secondInstanceName");
+ Configuration third = new PropertiesConfiguration();
+ third.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "thirdZkHosts");
+ third.addProperty(ClientProperty.INSTANCE_NAME.getKey(), "thirdInstanceName");
+ third.addProperty(ClientProperty.INSTANCE_ZK_TIMEOUT.getKey(), "123s");
+ return new ClientConfiguration(Arrays.asList(first, second, third));
+ }
+
+ @Test
+ public void testSasl() {
+ ClientConfiguration conf = new ClientConfiguration(Collections.<Configuration> emptyList());
+ assertEquals("false", conf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ conf.withSasl(false);
+ assertEquals("false", conf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ conf.withSasl(true);
+ assertEquals("true", conf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ final String primary = "accumulo";
+ conf.withSasl(true, primary);
+ assertEquals(primary, conf.get(ClientProperty.KERBEROS_SERVER_PRIMARY));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
new file mode 100644
index 0000000..2723273
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class ThriftTransportKeyTest {
+
+ @Test(expected = RuntimeException.class)
+ public void testSslAndSaslErrors() {
+ ClientContext clientCtx = createMock(ClientContext.class);
+ SslConnectionParams sslParams = createMock(SslConnectionParams.class);
+ SaslConnectionParams saslParams = createMock(SaslConnectionParams.class);
+
+ expect(clientCtx.getClientSslParams()).andReturn(sslParams).anyTimes();
+ expect(clientCtx.getClientSaslParams()).andReturn(saslParams).anyTimes();
+
+ // We don't care to verify the sslparam or saslparam mocks
+ replay(clientCtx);
+
+ try {
+ new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999), 120 * 1000, clientCtx);
+ } finally {
+ verify(clientCtx);
+ }
+ }
+
+ @Test
+ public void testSaslPrincipalIsSignificant() {
+ SaslConnectionParams saslParams1 = createMock(SaslConnectionParams.class), saslParams2 = createMock(SaslConnectionParams.class);
+ expect(saslParams1.getPrincipal()).andReturn("user1");
+ expect(saslParams2.getPrincipal()).andReturn("user2");
+
+ replay(saslParams1, saslParams2);
+
+ ThriftTransportKey ttk1 = new ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1l, null, saslParams1), ttk2 = new ThriftTransportKey(
+ HostAndPort.fromParts("localhost", 9997), 1l, null, saslParams2);
+
+ assertNotEquals(ttk1, ttk2);
+ assertNotEquals(ttk1.hashCode(), ttk2.hashCode());
+
+ verify(saslParams1, saslParams2);
+ }
+
+ @Test
+ public void testSimpleEquivalence() {
+ ClientContext clientCtx = createMock(ClientContext.class);
+
+ expect(clientCtx.getClientSslParams()).andReturn(null).anyTimes();
+ expect(clientCtx.getClientSaslParams()).andReturn(null).anyTimes();
+
+ replay(clientCtx);
+
+ ThriftTransportKey ttk = new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999), 120 * 1000, clientCtx);
+
+ assertTrue("Normal ThriftTransportKey doesn't equal itself", ttk.equals(ttk));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/conf/ClientConfigurationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/ClientConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/ClientConfigurationTest.java
deleted file mode 100644
index 40be70f..0000000
--- a/core/src/test/java/org/apache/accumulo/core/conf/ClientConfigurationTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.conf;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.junit.Test;
-
-public class ClientConfigurationTest {
- @Test
- public void testOverrides() throws Exception {
- ClientConfiguration clientConfig = createConfig();
- assertExpectedConfig(clientConfig);
- }
-
- @Test
- public void testSerialization() throws Exception {
- ClientConfiguration clientConfig = createConfig();
- // sanity check that we're starting with what we're expecting
- assertExpectedConfig(clientConfig);
-
- String serialized = clientConfig.serialize();
- ClientConfiguration deserializedClientConfig = ClientConfiguration.deserialize(serialized);
- assertExpectedConfig(deserializedClientConfig);
- }
-
- private void assertExpectedConfig(ClientConfiguration clientConfig) {
- assertEquals("firstZkHosts", clientConfig.get(ClientProperty.INSTANCE_ZK_HOST));
- assertEquals("secondInstanceName", clientConfig.get(ClientProperty.INSTANCE_NAME));
- assertEquals("123s", clientConfig.get(ClientProperty.INSTANCE_ZK_TIMEOUT));
- assertEquals(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE.getDefaultValue(), clientConfig.get(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE));
- }
-
- private ClientConfiguration createConfig() {
- Configuration first = new PropertiesConfiguration();
- first.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "firstZkHosts");
- Configuration second = new PropertiesConfiguration();
- second.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "secondZkHosts");
- second.addProperty(ClientProperty.INSTANCE_NAME.getKey(), "secondInstanceName");
- Configuration third = new PropertiesConfiguration();
- third.addProperty(ClientProperty.INSTANCE_ZK_HOST.getKey(), "thirdZkHosts");
- third.addProperty(ClientProperty.INSTANCE_NAME.getKey(), "thirdInstanceName");
- third.addProperty(ClientProperty.INSTANCE_ZK_TIMEOUT.getKey(), "123s");
- return new ClientConfiguration(Arrays.asList(first, second, third));
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
new file mode 100644
index 0000000..8c65776
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Map;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams.QualityOfProtection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SaslConnectionParamsTest {
+
+ private String user;
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ user = UserGroupInformation.getCurrentUser().getUserName();
+ }
+
+ @Test
+ public void testNullParams() {
+ ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+ AccumuloConfiguration rpcConf = ClientContext.convertClientConfig(clientConf);
+ assertEquals("false", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ assertNull(SaslConnectionParams.forConfig(rpcConf));
+ }
+
+ @Test
+ public void testDefaultParamsAsClient() throws Exception {
+ final ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+
+ // The primary is the first component of the principal
+ final String primary = "accumulo";
+ clientConf.withSasl(true, primary);
+
+ assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+ final SaslConnectionParams saslParams = SaslConnectionParams.forConfig(clientConf);
+ assertEquals(primary, saslParams.getKerberosServerPrimary());
+
+ final QualityOfProtection defaultQop = QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue());
+ assertEquals(defaultQop, saslParams.getQualityOfProtection());
+
+ Map<String,String> properties = saslParams.getSaslProperties();
+ assertEquals(1, properties.size());
+ assertEquals(defaultQop.getQuality(), properties.get(Sasl.QOP));
+ assertEquals(user, saslParams.getPrincipal());
+ }
+
+ @Test
+ public void testDefaultParamsAsServer() throws Exception {
+ final ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+
+ // The primary is the first component of the principal
+ final String primary = "accumulo";
+ clientConf.withSasl(true, primary);
+
+ final AccumuloConfiguration rpcConf = ClientContext.convertClientConfig(clientConf);
+ assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+ final SaslConnectionParams saslParams = SaslConnectionParams.forConfig(rpcConf);
+ assertEquals(primary, saslParams.getKerberosServerPrimary());
+
+ final QualityOfProtection defaultQop = QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue());
+ assertEquals(defaultQop, saslParams.getQualityOfProtection());
+
+ Map<String,String> properties = saslParams.getSaslProperties();
+ assertEquals(1, properties.size());
+ assertEquals(defaultQop.getQuality(), properties.get(Sasl.QOP));
+ assertEquals(user, saslParams.getPrincipal());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
index 5884da2..b9a85e2 100644
--- a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
+++ b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
@@ -55,6 +55,8 @@ include::chapters/implementation.txt[]
include::chapters/ssl.txt[]
+include::chapters/kerberos.txt[]
+
include::chapters/administration.txt[]
include::chapters/multivolume.txt[]
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/docs/src/main/asciidoc/chapters/clients.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/clients.txt b/docs/src/main/asciidoc/chapters/clients.txt
index 64f0e55..3f85074 100644
--- a/docs/src/main/asciidoc/chapters/clients.txt
+++ b/docs/src/main/asciidoc/chapters/clients.txt
@@ -67,6 +67,17 @@ KeyStore to alleviate passwords stored in cleartext. When stored in HDFS, a sing
KeyStore can be used across an entire instance. Be aware that KeyStores stored on
the local filesystem must be made available to all nodes in the Accumulo cluster.
+[source,java]
+----
+KerberosToken token = new KerberosToken();
+Connector conn = inst.getConnector(token.getPrincipal(), token);
+----
+
+The KerberosToken can be provided to use the authentication provided by Kerberos.
+Using Kerberos requires external setup and additional configuration, but provides
+a single point of authentication through HDFS, YARN and ZooKeeper and allowing
+for password-less authentication with Accumulo.
+
=== Writing Data
Data are written to Accumulo by creating Mutation objects that represent all the
[2/4] accumulo git commit: ACCUMULO-2815 Support for Kerberos client
authentication.
Posted by el...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java
new file mode 100644
index 0000000..b047f1a
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.Base64;
+
+/**
+ * Kerberos principals might contains identifiers that are not valid ZNodes ('/'). Base64-encodes the principals before interacting with ZooKeeper.
+ */
+public class KerberosAuthorizor implements Authorizor {
+
+ private static KerberosAuthorizor INST;
+
+ public static synchronized KerberosAuthorizor getInstance() {
+ if (INST == null)
+ INST = new KerberosAuthorizor();
+ return INST;
+ }
+
+ private final ZKAuthorizor zkAuthorizor;
+
+ public KerberosAuthorizor() {
+ zkAuthorizor = new ZKAuthorizor();
+ }
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ zkAuthorizor.initialize(instanceId, initialize);
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator auth, PermissionHandler pm) {
+ return auth instanceof KerberosAuthenticator && pm instanceof KerberosPermissionHandler;
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials credentials, String rootuser) throws AccumuloSecurityException, ThriftSecurityException {
+ zkAuthorizor.initializeSecurity(credentials, Base64.encodeBase64String(rootuser.getBytes(UTF_8)));
+ }
+
+ @Override
+ public void changeAuthorizations(String user, Authorizations authorizations) throws AccumuloSecurityException {
+ zkAuthorizor.changeAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8)), authorizations);
+ }
+
+ @Override
+ public Authorizations getCachedUserAuthorizations(String user) throws AccumuloSecurityException {
+ return zkAuthorizor.getCachedUserAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8)));
+ }
+
+ @Override
+ public boolean isValidAuthorizations(String user, List<ByteBuffer> list) throws AccumuloSecurityException {
+ return zkAuthorizor.isValidAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8)), list);
+ }
+
+ @Override
+ public void initUser(String user) throws AccumuloSecurityException {
+ zkAuthorizor.initUser(Base64.encodeBase64String(user.getBytes(UTF_8)));
+ }
+
+ @Override
+ public void dropUser(String user) throws AccumuloSecurityException {
+ user = Base64.encodeBase64String(user.getBytes(UTF_8));
+ zkAuthorizor.dropUser(user);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java
new file mode 100644
index 0000000..691c555
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.NamespaceNotFoundException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.security.NamespacePermission;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.Base64;
+
+/**
+ * Kerberos principals might contains identifiers that are not valid ZNodes ('/'). Base64-encodes the principals before interacting with ZooKeeper.
+ */
+public class KerberosPermissionHandler implements PermissionHandler {
+
+ private static KerberosPermissionHandler INST;
+
+ public static synchronized KerberosPermissionHandler getInstance() {
+ if (INST == null)
+ INST = new KerberosPermissionHandler();
+ return INST;
+ }
+
+ private final ZKPermHandler zkPermissionHandler;
+
+ public KerberosPermissionHandler() {
+ zkPermissionHandler = new ZKPermHandler();
+ }
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ zkPermissionHandler.initialize(instanceId, initialize);
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator authent, Authorizor author) {
+ return authent instanceof KerberosAuthenticator && author instanceof KerberosAuthorizor;
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials credentials, String rootuser) throws AccumuloSecurityException, ThriftSecurityException {
+ zkPermissionHandler.initializeSecurity(credentials, Base64.encodeBase64String(rootuser.getBytes(UTF_8)));
+ }
+
+ @Override
+ public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return zkPermissionHandler.hasSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission);
+ }
+
+ @Override
+ public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return zkPermissionHandler.hasCachedSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission);
+ }
+
+ @Override
+ public boolean hasTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return zkPermissionHandler.hasTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission);
+ }
+
+ @Override
+ public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return zkPermissionHandler.hasCachedTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission);
+ }
+
+ @Override
+ public boolean hasNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ return zkPermissionHandler.hasNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission);
+ }
+
+ @Override
+ public boolean hasCachedNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ return zkPermissionHandler.hasCachedNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission);
+ }
+
+ @Override
+ public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ zkPermissionHandler.grantSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission);
+ }
+
+ @Override
+ public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ zkPermissionHandler.revokeSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission);
+ }
+
+ @Override
+ public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ zkPermissionHandler.grantTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission);
+ }
+
+ @Override
+ public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ zkPermissionHandler.revokeTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission);
+ }
+
+ @Override
+ public void grantNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ zkPermissionHandler.grantNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission);
+ }
+
+ @Override
+ public void revokeNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ zkPermissionHandler.revokeNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission);
+ }
+
+ @Override
+ public void cleanTablePermissions(String table) throws AccumuloSecurityException, TableNotFoundException {
+ zkPermissionHandler.cleanTablePermissions(table);
+ }
+
+ @Override
+ public void cleanNamespacePermissions(String namespace) throws AccumuloSecurityException, NamespaceNotFoundException {
+ zkPermissionHandler.cleanNamespacePermissions(namespace);
+ }
+
+ @Override
+ public void initUser(String user) throws AccumuloSecurityException {
+ zkPermissionHandler.initUser(Base64.encodeBase64String(user.getBytes(UTF_8)));
+ }
+
+ @Override
+ public void initTable(String table) throws AccumuloSecurityException {
+ zkPermissionHandler.initTable(table);
+ }
+
+ @Override
+ public void cleanUser(String user) throws AccumuloSecurityException {
+ zkPermissionHandler.cleanUser(Base64.encodeBase64String(user.getBytes(UTF_8)));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java
new file mode 100644
index 0000000..4e4f8ce
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.io.IOException;
+
+import javax.security.sasl.SaslServer;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processor that pulls the SaslServer object out of the transport, and assumes the remote user's UGI before calling through to the original processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ *
+ * Lifted from Apache Hive 0.14
+ */
+public class UGIAssumingProcessor implements TProcessor {
+ private static final Logger log = LoggerFactory.getLogger(UGIAssumingProcessor.class);
+
+ public static final ThreadLocal<String> principal = new ThreadLocal<String>();
+ private final TProcessor wrapped;
+ private final UserGroupInformation loginUser;
+
+ public UGIAssumingProcessor(TProcessor wrapped) {
+ this.wrapped = wrapped;
+ try {
+ this.loginUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ log.error("Failed to obtain login user", e);
+ throw new RuntimeException("Failed to obtain login user", e);
+ }
+ }
+
+ /**
+ * The principal of the user who authenticated over SASL.
+ */
+ public static String currentPrincipal() {
+ return principal.get();
+ }
+
+ @Override
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ TTransport trans = inProt.getTransport();
+ if (!(trans instanceof TSaslServerTransport)) {
+ throw new TException("Unexpected non-SASL transport " + trans.getClass() + ": " + trans);
+ }
+ TSaslServerTransport saslTrans = (TSaslServerTransport) trans;
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ String endUser = authId;
+
+ log.trace("Received SASL RPC from {}", endUser);
+
+ UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(endUser, loginUser);
+ final String remoteUser = clientUgi.getUserName();
+
+ try {
+ // Set the principal in the ThreadLocal for access to get authorizations
+ principal.set(remoteUser);
+
+ return wrapped.process(inProt, outProt);
+ } finally {
+ // Unset the principal after we're done using it just to be sure that it's not incorrectly
+ // used in the same thread down the line.
+ principal.set(null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 7d247f7..8407c15 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.security.Authorizations;
@@ -56,6 +57,7 @@ import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
@@ -174,6 +176,13 @@ public class Admin {
cl.usage();
return;
}
+
+ AccumuloConfiguration siteConf = SiteConfiguration.getInstance();
+ // Login as the server on secure HDFS
+ if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ SecurityUtil.serverLogin(siteConf);
+ }
+
Instance instance = opts.getInstance();
ServerConfigurationFactory confFactory = new ServerConfigurationFactory(instance);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index ef182f1..759d898 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@ -20,9 +20,13 @@ import java.util.List;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.log4j.Logger;
@@ -64,6 +68,12 @@ public class ZooZap {
return;
}
+ AccumuloConfiguration siteConf = SiteConfiguration.getInstance();
+ // Login as the server on secure HDFS
+ if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ SecurityUtil.serverLogin(siteConf);
+ }
+
String iid = opts.getInstance().getInstanceID();
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
new file mode 100644
index 0000000..56f3933
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AccumuloServerContextTest {
+
+ private String user;
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ user = UserGroupInformation.getCurrentUser().getUserName();
+ }
+
+ @Test
+ public void testSasl() throws Exception {
+ MockInstance instance = new MockInstance();
+
+ ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+ clientConf.setProperty(ClientProperty.INSTANCE_RPC_SASL_ENABLED, "true");
+ clientConf.setProperty(ClientProperty.KERBEROS_SERVER_PRIMARY, "accumulo");
+ final AccumuloConfiguration conf = ClientContext.convertClientConfig(clientConf);
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
+
+ ServerConfigurationFactory factory = EasyMock.createMock(ServerConfigurationFactory.class);
+ EasyMock.expect(factory.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+ EasyMock.expect(factory.getInstance()).andReturn(instance).anyTimes();
+
+ AccumuloServerContext context = EasyMock.createMockBuilder(AccumuloServerContext.class).addMockedMethod("enforceKerberosLogin")
+ .addMockedMethod("getConfiguration").addMockedMethod("getServerConfigurationFactory").createMock();
+ context.enforceKerberosLogin();
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.expect(context.getServerConfigurationFactory()).andReturn(factory).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our ClientConfiguration (by way of the AccumuloConfiguration)
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return conf.get((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return conf.iterator();
+ }
+ }).anyTimes();
+
+ EasyMock.replay(factory, context, siteConfig);
+
+ Assert.assertEquals(ThriftServerType.SASL, context.getThriftServerType());
+ SaslConnectionParams saslParams = context.getServerSaslParams();
+ Assert.assertEquals(SaslConnectionParams.forConfig(conf), saslParams);
+ Assert.assertEquals(user, saslParams.getPrincipal());
+
+ EasyMock.verify(factory, context, siteConfig);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
new file mode 100644
index 0000000..aba1aa0
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TCredentialsUpdatingInvocationHandlerTest {
+
+ TCredentialsUpdatingInvocationHandler<Object> proxy;
+
+ @Before
+ public void setup() {
+ proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object());
+ }
+
+ @After
+ public void teardown() {
+ UGIAssumingProcessor.principal.set(null);
+ }
+
+ @Test
+ public void testNoArgsAreIgnored() throws Exception {
+ proxy.updateArgs(new Object[] {});
+ }
+
+ @Test
+ public void testNoTCredsInArgsAreIgnored() throws Exception {
+ proxy.updateArgs(new Object[] {new Object(), new Object()});
+ }
+
+ @Test
+ public void testCachedTokenClass() throws Exception {
+ final String principal = "root";
+ ConcurrentHashMap<String,Class<? extends AuthenticationToken>> cache = proxy.getTokenCache();
+ cache.clear();
+ TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
+ UGIAssumingProcessor.principal.set(principal);
+ proxy.updateArgs(new Object[] {new Object(), tcreds});
+ Assert.assertEquals(1, cache.size());
+ Assert.assertEquals(KerberosToken.class, cache.get(KerberosToken.CLASS_NAME));
+ }
+
+ @Test(expected = ThriftSecurityException.class)
+ public void testMissingPrincipal() throws Exception {
+ final String principal = "root";
+ TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
+ UGIAssumingProcessor.principal.set(null);
+ proxy.updateArgs(new Object[] {new Object(), tcreds});
+ }
+
+ @Test(expected = ThriftSecurityException.class)
+ public void testMismatchedPrincipal() throws Exception {
+ final String principal = "root";
+ TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
+ UGIAssumingProcessor.principal.set(principal + "foobar");
+ proxy.updateArgs(new Object[] {new Object(), tcreds});
+ }
+
+ @Test(expected = ThriftSecurityException.class)
+ public void testWrongTokenType() throws Exception {
+ final String principal = "root";
+ TCredentials tcreds = new TCredentials(principal, PasswordToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+ UGIAssumingProcessor.principal.set(principal);
+ proxy.updateArgs(new Object[] {new Object(), tcreds});
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
new file mode 100644
index 0000000..f3f1bdd
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.conf.Property;
+import org.junit.Test;
+
+public class ThriftServerTypeTest {
+
+ @Test
+ public void testDefaultServer() {
+ assertEquals(ThriftServerType.CUSTOM_HS_HA, ThriftServerType.get(Property.GENERAL_RPC_SERVER_TYPE.getDefaultValue()));
+ }
+
+ @Test
+ public void testSpecialServer() {
+ assertEquals(ThriftServerType.THREADPOOL, ThriftServerType.get("threadpool"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index c380eb7..7efabb6 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -91,7 +91,9 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.accumulo.server.rpc.RpcWrapper;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.tables.TableManager;
import org.apache.accumulo.server.util.Halt;
@@ -707,14 +709,21 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
}
private HostAndPort startStatsService() throws UnknownHostException {
- Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this));
+ Iface rpcProxy = RpcWrapper.service(this);
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass());
+ processor = new Processor<Iface>(tcProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
int port = getConfiguration().getPort(Property.GC_PORT);
long maxMessageSize = getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
log.debug("Starting garbage collector listening on " + result);
try {
- return TServerUtils.startTServer(getConfiguration(), result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, getConfiguration()
- .getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getServerSslParams(), 0).address;
+ return TServerUtils.startTServer(getConfiguration(), result, getThriftServerType(), processor, this.getClass().getSimpleName(), "GC Monitor Service", 2,
+ getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getServerSslParams(), getServerSaslParams(), 0).address;
} catch (Exception ex) {
log.fatal(ex, ex);
throw new RuntimeException(ex);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index f98721f..1d7f90f 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,9 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
@@ -59,6 +63,8 @@ import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -90,14 +96,36 @@ public class GarbageCollectWriteAheadLogsTest {
@Before
public void setUp() throws Exception {
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
instance = createMock(Instance.class);
expect(instance.getInstanceID()).andReturn("mock").anyTimes();
- systemConfig = createMock(AccumuloConfiguration.class);
+ expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
+ expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
+ systemConfig = new ConfigurationCopy(new HashMap<String,String>());
volMgr = createMock(VolumeManager.class);
ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
expect(factory.getInstance()).andReturn(instance).anyTimes();
- replay(instance, factory);
+ expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConfig.get((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return systemConfig.iterator();
+ }
+ }).anyTimes();
+
+ replay(instance, factory, siteConfig);
AccumuloServerContext context = new AccumuloServerContext(factory);
gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
modTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
index 99558b8..6fcdd37 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
@@ -29,10 +29,15 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.trace.thrift.TInfo;
@@ -42,6 +47,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.security.SystemCredentials;
import org.apache.hadoop.fs.Path;
import org.easymock.EasyMock;
+import org.easymock.IAnswer;
import org.junit.Before;
import org.junit.Test;
@@ -51,20 +57,42 @@ public class SimpleGarbageCollectorTest {
private Credentials credentials;
private Opts opts;
private SimpleGarbageCollector gc;
- private AccumuloConfiguration systemConfig;
+ private ConfigurationCopy systemConfig;
@Before
public void setUp() {
volMgr = createMock(VolumeManager.class);
instance = createMock(Instance.class);
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
expect(instance.getInstanceID()).andReturn("mock").anyTimes();
+ expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
+ expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
opts = new Opts();
- systemConfig = mockSystemConfig();
+ systemConfig = createSystemConfig();
ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
expect(factory.getInstance()).andReturn(instance).anyTimes();
- expect(factory.getConfiguration()).andReturn(mockSystemConfig()).anyTimes();
- replay(instance, factory);
+ expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
+ expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConfig.get((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return systemConfig.iterator();
+ }
+ }).anyTimes();
+
+ replay(instance, factory, siteConfig);
credentials = SystemCredentials.get(instance);
gc = new SimpleGarbageCollector(opts, volMgr, factory);
@@ -76,26 +104,20 @@ public class SimpleGarbageCollectorTest {
assertNotNull(gc.getStatus(createMock(TInfo.class), createMock(TCredentials.class)));
}
- private AccumuloConfiguration mockSystemConfig() {
- AccumuloConfiguration systemConfig = createMock(AccumuloConfiguration.class);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L);
- expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2).times(2);
- expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(false);
- expect(systemConfig.getBoolean(Property.GC_FILE_ARCHIVE)).andReturn(false);
- replay(systemConfig);
- return systemConfig;
+ private ConfigurationCopy createSystemConfig() {
+ Map<String,String> conf = new HashMap<>();
+ conf.put(Property.INSTANCE_RPC_SASL_ENABLED.getKey(), "false");
+ conf.put(Property.GC_CYCLE_START.getKey(), "1");
+ conf.put(Property.GC_CYCLE_DELAY.getKey(), "20");
+ conf.put(Property.GC_DELETE_THREADS.getKey(), "2");
+ conf.put(Property.GC_TRASH_IGNORE.getKey(), "false");
+ conf.put(Property.GC_FILE_ARCHIVE.getKey(), "false");
+
+ return new ConfigurationCopy(conf);
}
@Test
public void testInit() throws Exception {
- EasyMock.reset(systemConfig);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L).times(2);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L);
- expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2).times(2);
- expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(false);
- replay(systemConfig);
assertSame(volMgr, gc.getVolumeManager());
assertSame(instance, gc.getInstance());
assertEquals(credentials, gc.getCredentials());
@@ -124,13 +146,7 @@ public class SimpleGarbageCollectorTest {
@Test
public void testMoveToTrash_NotUsingTrash() throws Exception {
- AccumuloConfiguration systemConfig = createMock(AccumuloConfiguration.class);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L);
- expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L);
- expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2);
- expect(systemConfig.getBoolean(Property.GC_FILE_ARCHIVE)).andReturn(false);
- expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(true);
- replay(systemConfig);
+ systemConfig.set(Property.GC_TRASH_IGNORE.getKey(), "true");
Path path = createMock(Path.class);
assertFalse(gc.archiveOrMoveToTrash(path));
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index cad1e01..120692a 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -25,6 +25,7 @@ import static org.easymock.EasyMock.verify;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -41,6 +42,9 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
@@ -84,12 +88,34 @@ public class CloseWriteAheadLogReferencesTest {
@Before
public void setup() {
inst = createMock(Instance.class);
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
expect(inst.getInstanceID()).andReturn(testName.getMethodName()).anyTimes();
- AccumuloConfiguration systemConf = createMock(AccumuloConfiguration.class);
+ expect(inst.getZooKeepers()).andReturn("localhost").anyTimes();
+ expect(inst.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
+ final AccumuloConfiguration systemConf = new ConfigurationCopy(new HashMap<String,String>());
ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
expect(factory.getConfiguration()).andReturn(systemConf).anyTimes();
expect(factory.getInstance()).andReturn(inst).anyTimes();
- replay(inst, factory);
+ expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConf.get((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return systemConf.iterator();
+ }
+ }).anyTimes();
+
+ replay(inst, factory, siteConfig);
refs = new CloseWriteAheadLogReferences(new AccumuloServerContext(factory));
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 12195fa..a6ea6ea 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -118,7 +118,9 @@ import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.replication.ZooKeeperInitialization;
import org.apache.accumulo.server.rpc.RpcWrapper;
import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.SecurityUtil;
@@ -1090,7 +1092,14 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
clientHandler = new MasterClientServiceHandler(this);
- Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(clientHandler));
+ Iface rpcProxy = RpcWrapper.service(clientHandler);
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass());
+ processor = new Processor<Iface>(tcredsProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
ServerAddress sa = TServerUtils.startServer(this, hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null,
Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
clientService = sa.server;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index 580852d..e8dacaf 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@ -221,6 +221,8 @@ public class CompactRange extends MasterRepo {
if (iterators.size() > 0 || !compactionStrategy.equals(CompactionStrategyConfigUtil.DEFAULT_STRATEGY)) {
this.config = WritableUtils.toByteArray(new UserCompactionConfig(this.startRow, this.endRow, iterators, compactionStrategy));
+ } else {
+ log.info("No iterators or compaction strategy");
}
if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
@@ -256,6 +258,9 @@ public class CompactRange extends MasterRepo {
if (tokens[i].startsWith(txidString))
continue; // skip self
+ log.debug("txidString : " + txidString);
+ log.debug("tokens[" + i + "] : " + tokens[i]);
+
throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
"Another compaction with iterators and/or a compaction strategy is running");
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
index 2d98fed..1a098c2 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.monitor.servlets.BasicServlet;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.tracer.TraceFormatter;
abstract class Basic extends BasicServlet {
@@ -88,6 +89,10 @@ abstract class Basic extends BasicServlet {
at = token;
}
+ if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ principal = SecurityUtil.getServerPrincipal(principal);
+ }
+
String table = conf.get(Property.TRACE_TABLE);
try {
Connector conn = HdfsZooInstance.getInstance().getConnector(principal, at);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 3063cdc..f855d9c 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -176,10 +176,16 @@ public class TraceServer implements Watcher {
Connector connector = null;
while (true) {
try {
+ final boolean isDefaultTokenType = conf.get(Property.TRACE_TOKEN_TYPE).equals(Property.TRACE_TOKEN_TYPE.getDefaultValue());
String principal = conf.get(Property.TRACE_USER);
+ if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ // Make sure that we replace _HOST if it exists in the principal
+ principal = SecurityUtil.getServerPrincipal(principal);
+ }
AuthenticationToken at;
Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
- if (loginMap.isEmpty()) {
+ if (loginMap.isEmpty() && isDefaultTokenType) {
+ // Assume the old type of user/password specification
Property p = Property.TRACE_PASSWORD;
at = new PasswordToken(conf.get(p).getBytes(UTF_8));
} else {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 2bfa5a0..b08340f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -172,7 +172,9 @@ import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.replication.ZooKeeperInitialization;
import org.apache.accumulo.server.rpc.RpcWrapper;
import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.SecurityUtil;
@@ -315,7 +317,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
Instance instance = getInstance();
this.sessionManager = new SessionManager(aconf);
this.logSorter = new LogSorter(instance, fs, aconf);
- this.replWorker = new ReplicationWorker(instance, fs, aconf);
+ this.replWorker = new ReplicationWorker(this, fs);
this.statsKeeper = new TabletStatsKeeper();
SimpleTimer.getInstance(aconf).schedule(new Runnable() {
@Override
@@ -2272,8 +2274,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
private HostAndPort startTabletClientService() throws UnknownHostException {
// start listening for client connection last
- Iface tch = RpcWrapper.service(new ThriftClientHandler());
- Processor<Iface> processor = new Processor<Iface>(tch);
+ Iface rpcProxy = RpcWrapper.service(new ThriftClientHandler());
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class);
+ processor = new Processor<Iface>(tcredProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
HostAndPort address = startServer(getServerConfigurationFactory().getConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor,
"Thrift Client Server");
log.info("address = " + address);
@@ -2281,7 +2289,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
private HostAndPort startReplicationService() throws UnknownHostException {
- ReplicationServicer.Iface repl = RpcWrapper.service(new ReplicationServicerHandler(this));
+ final ReplicationServicerHandler handler = new ReplicationServicerHandler(this);
+ ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler);
+ ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass());
ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration();
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index bd6bcd3..de99029 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -19,13 +19,14 @@ package org.apache.accumulo.tserver.replication;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.security.SystemCredentials;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -39,13 +40,15 @@ public class ReplicationWorker implements Runnable {
private Instance inst;
private VolumeManager fs;
+ private Credentials creds;
private AccumuloConfiguration conf;
private ThreadPoolExecutor executor;
- public ReplicationWorker(Instance inst, VolumeManager fs, AccumuloConfiguration conf) {
- this.inst = inst;
+ public ReplicationWorker(ClientContext clientCtx, VolumeManager fs) {
+ this.inst = clientCtx.getInstance();
this.fs = fs;
- this.conf = conf;
+ this.conf = clientCtx.getConfiguration();
+ this.creds = clientCtx.getCredentials();
}
public void setExecutor(ThreadPoolExecutor executor) {
@@ -69,7 +72,7 @@ public class ReplicationWorker implements Runnable {
workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_WORK_QUEUE, conf);
}
- workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get(inst)), executor);
+ workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, creds), executor);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/main/java/org/apache/accumulo/shell/Shell.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
index d897fc3..a64ff45 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
@@ -199,7 +199,6 @@ public class Shell extends ShellOptions {
protected Instance instance;
private Connector connector;
protected ConsoleReader reader;
- private String principal;
private AuthenticationToken token;
private final Class<? extends Formatter> defaultFormatterClass = DefaultFormatter.class;
private final Class<? extends Formatter> binaryFormatterClass = BinaryFormatter.class;
@@ -275,8 +274,22 @@ public class Shell extends ShellOptions {
authTimeout = TimeUnit.MINUTES.toNanos(options.getAuthTimeout());
disableAuthTimeout = options.isAuthTimeoutDisabled();
+ ClientConfiguration clientConf;
+ try {
+ clientConf = options.getClientConfiguration();
+ } catch (Exception e) {
+ printException(e);
+ return true;
+ }
+
// get the options that were parsed
- String user = options.getUsername();
+ final String user;
+ try {
+ user = options.getUsername();
+ } catch (Exception e) {
+ printException(e);
+ return true;
+ }
String password = options.getPassword();
tabCompletion = !options.isTabCompletionDisabled();
@@ -285,7 +298,13 @@ public class Shell extends ShellOptions {
setInstance(options);
// AuthenticationToken options
- token = options.getAuthenticationToken();
+ try {
+ token = options.getAuthenticationToken();
+ } catch (Exception e) {
+ printException(e);
+ return true;
+ }
+
Map<String,String> loginOptions = options.getTokenProperties();
// process default parameters if unspecified
@@ -328,12 +347,11 @@ public class Shell extends ShellOptions {
}
if (!options.isFake()) {
- DistributedTrace.enable(InetAddress.getLocalHost().getHostName(), "shell", options.getClientConfiguration());
+ DistributedTrace.enable(InetAddress.getLocalHost().getHostName(), "shell", clientConf);
}
this.setTableName("");
- this.principal = user;
- connector = instance.getConnector(this.principal, token);
+ connector = instance.getConnector(user, token);
} catch (Exception e) {
printException(e);
@@ -1157,12 +1175,11 @@ public class Shell extends ShellOptions {
public void updateUser(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
connector = instance.getConnector(principal, token);
- this.principal = principal;
this.token = token;
}
public String getPrincipal() {
- return principal;
+ return connector.whoami();
}
public AuthenticationToken getToken() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java b/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
index 875367d..be53d5d 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
@@ -27,8 +27,10 @@ import java.util.TreeMap;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +41,10 @@ import com.beust.jcommander.ParameterException;
import com.beust.jcommander.converters.FileConverter;
public class ShellOptionsJC {
- private static final Logger log = LoggerFactory.getLogger(Shell.class);
+ private static final Logger log = LoggerFactory.getLogger(ShellOptionsJC.class);
@Parameter(names = {"-u", "--user"}, description = "username (defaults to your OS user)")
- private String username = System.getProperty("user.name", "root");
+ private String username = null;
public static class PasswordConverter implements IStringConverter<String> {
public static final String STDIN = "stdin";
@@ -126,7 +128,7 @@ public class ShellOptionsJC {
return Class.forName(value).asSubclass(AuthenticationToken.class).newInstance();
} catch (Exception e) {
// Catching ClassNotFoundException, ClassCastException, InstantiationException and IllegalAccessException
- log.error("Could not instantiate AuthenticationToken " + value, e);
+ log.error("Could not instantiate AuthenticationToken {}", value, e);
throw new ParameterException(e);
}
}
@@ -169,6 +171,9 @@ public class ShellOptionsJC {
@Parameter(names = {"--ssl"}, description = "use ssl to connect to accumulo")
private boolean useSsl = false;
+ @Parameter(names = "--sasl", description = "use SASL to connect to Accumulo (Kerberos)")
+ private boolean useSasl = false;
+
@Parameter(names = "--config-file", description = "read the given client config file. "
+ "If omitted, the path searched can be specified with $ACCUMULO_CLIENT_CONF_PATH, "
+ "which defaults to ~/.accumulo/config:$ACCUMULO_CONF_DIR/client.conf:/etc/accumulo/client.conf")
@@ -189,7 +194,19 @@ public class ShellOptionsJC {
@Parameter(hidden = true)
private List<String> unrecognizedOptions;
- public String getUsername() {
+ public String getUsername() throws Exception {
+ if (null == username) {
+ final ClientConfiguration clientConf = getClientConfiguration();
+ if (Boolean.parseBoolean(clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED))) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ throw new RuntimeException("Kerberos security is not enabled");
+ }
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ username = ugi.getUserName();
+ } else {
+ username = System.getProperty("user.name", "root");
+ }
+ }
return username;
}
@@ -197,7 +214,15 @@ public class ShellOptionsJC {
return password;
}
- public AuthenticationToken getAuthenticationToken() {
+ public AuthenticationToken getAuthenticationToken() throws Exception {
+ if (null == authenticationToken) {
+ final ClientConfiguration clientConf = getClientConfiguration();
+ // Automatically use a KerberosToken if the client conf is configured for SASL
+ final boolean saslEnabled = Boolean.parseBoolean(clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ if (saslEnabled) {
+ authenticationToken = new KerberosToken();
+ }
+ }
return authenticationToken;
}
@@ -275,7 +300,13 @@ public class ShellOptionsJC {
if (useSsl()) {
clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SSL_ENABLED, "true");
}
+ if (useSasl()) {
+ clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SASL_ENABLED, "true");
+ }
return clientConfig;
}
+ public boolean useSasl() {
+ return useSasl;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java
----------------------------------------------------------------------
diff --git a/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java b/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java
new file mode 100644
index 0000000..0c4e4c7
--- /dev/null
+++ b/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.shell;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.beust.jcommander.JCommander;
+
+/**
+ *
+ */
+public class ShellOptionsJCTest {
+
+ ShellOptionsJC options;
+
+ @Before
+ public void setup() {
+ options = new ShellOptionsJC();
+ }
+
+ @Test
+ public void testSasl() throws Exception {
+ JCommander jc = new JCommander();
+
+ jc.setProgramName("accumulo shell");
+ jc.addObject(options);
+ jc.parse(new String[] {"--sasl"});
+ ClientConfiguration clientConf = options.getClientConfiguration();
+ assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index 16f4125..b58df3c 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -156,6 +156,21 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <!-- Specifically depend on this version of minikdc to avoid having
+ to increase out normal hadoop dependency -->
+ <version>2.3.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <!-- Pulls in an older bouncycastle version -->
+ <exclusion>
+ <groupId>bouncycastle</groupId>
+ <artifactId>bcprov-jdk15</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
@@ -202,6 +217,7 @@
<timeout.factor>${timeout.factor}</timeout.factor>
<org.apache.accumulo.test.functional.useCredProviderForIT>${useCredProviderForIT}</org.apache.accumulo.test.functional.useCredProviderForIT>
<org.apache.accumulo.test.functional.useSslForIT>${useSslForIT}</org.apache.accumulo.test.functional.useSslForIT>
+ <org.apache.accumulo.test.functional.useKrbForIT>${useKrbForIT}</org.apache.accumulo.test.functional.useKrbForIT>
</systemPropertyVariables>
</configuration>
</plugin>
@@ -212,6 +228,7 @@
<systemPropertyVariables>
<org.apache.accumulo.test.functional.useCredProviderForIT>${useCredProviderForIT}</org.apache.accumulo.test.functional.useCredProviderForIT>
<org.apache.accumulo.test.functional.useSslForIT>${useSslForIT}</org.apache.accumulo.test.functional.useSslForIT>
+ <org.apache.accumulo.test.functional.useKrbForIT>${useKrbForIT}</org.apache.accumulo.test.functional.useKrbForIT>
</systemPropertyVariables>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 3bb44ff..0b047cb 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -102,8 +103,8 @@ public class ZombieTServer {
TransactionWatcher watcher = new TransactionWatcher();
final ThriftClientHandler tch = new ThriftClientHandler(context, watcher);
Processor<Iface> processor = new Processor<Iface>(tch);
- ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer",
- "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, -1);
+ ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), ThriftServerType.CUSTOM_HS_HA,
+ processor, "ZombieTServer", "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1);
String addressString = serverPort.address.toString();
String zPath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addressString;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 0afa243..b429607 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -70,6 +70,7 @@ import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
@@ -254,8 +255,8 @@ public class NullTserver {
TransactionWatcher watcher = new TransactionWatcher();
ThriftClientHandler tch = new ThriftClientHandler(new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())), watcher);
Processor<Iface> processor = new Processor<Iface>(tch);
- TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", opts.port), processor, "NullTServer", "null tserver", 2, 1, 1000,
- 10 * 1024 * 1024, null, -1);
+ TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", opts.port), ThriftServerType.CUSTOM_HS_HA, processor, "NullTServer",
+ "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1);
HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
index 8f7e1b7..c1ad17b 100644
--- a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
+++ b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
@@ -34,7 +34,9 @@ import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -48,6 +50,7 @@ import com.google.common.base.Preconditions;
*/
public abstract class AccumuloClusterIT extends AccumuloIT implements MiniClusterConfigurationCallback {
private static final Logger log = LoggerFactory.getLogger(AccumuloClusterIT.class);
+ private static final String TRUE = Boolean.toString(true);
public static enum ClusterType {
MINI, STANDALONE;
@@ -62,15 +65,68 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste
protected static AccumuloCluster cluster;
protected static ClusterType type;
protected static AccumuloClusterPropertyConfiguration clusterConf;
+ protected static TestingKdc krb;
@BeforeClass
public static void setUp() throws Exception {
clusterConf = AccumuloClusterPropertyConfiguration.get();
type = clusterConf.getClusterType();
+ if (ClusterType.MINI == type && TRUE.equals(System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION))) {
+ krb = new TestingKdc();
+ krb.start();
+ }
+
initialized = true;
}
+ @AfterClass
+ public static void tearDownKdc() throws Exception {
+ if (null != krb) {
+ krb.stop();
+ }
+ }
+
+ /**
+ * {@link TestingKdc#getAccumuloKeytab()}
+ */
+ public static File getAccumuloKeytab() {
+ if (null == krb) {
+ throw new RuntimeException("KDC not enabled");
+ }
+ return krb.getAccumuloKeytab();
+ }
+
+ /**
+ * {@link TestingKdc#getAccumuloPrincipal()}
+ */
+ public static String getAccumuloPrincipal() {
+ if (null == krb) {
+ throw new RuntimeException("KDC not enabled");
+ }
+ return krb.getAccumuloPrincipal();
+ }
+
+ /**
+ * {@link TestingKdc#getClientKeytab()}
+ */
+ public static File getClientKeytab() {
+ if (null == krb) {
+ throw new RuntimeException("KDC not enabled");
+ }
+ return krb.getClientKeytab();
+ }
+
+ /**
+ * {@link TestingKdc#getClientPrincipal()}
+ */
+ public static String getClientPrincipal() {
+ if (null == krb) {
+ throw new RuntimeException("KDC not enabled");
+ }
+ return krb.getClientPrincipal();
+ }
+
@Before
public void setupCluster() throws Exception {
// Before we try to instantiate the cluster, check to see if the test even wants to run against this type of cluster
@@ -80,7 +136,7 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste
case MINI:
MiniClusterHarness miniClusterHarness = new MiniClusterHarness();
// Intrinsically performs the callback to let tests alter MiniAccumuloConfig and core-site.xml
- cluster = miniClusterHarness.create(this, getToken());
+ cluster = miniClusterHarness.create(this, getToken(), krb);
break;
case STANDALONE:
StandaloneAccumuloClusterConfiguration conf = (StandaloneAccumuloClusterConfiguration) clusterConf;
@@ -98,6 +154,10 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste
if (type.isDynamic()) {
cluster.start();
+ if (null != krb) {
+ // Log in the 'client' user
+ UserGroupInformation.loginUserFromKeytab(getClientPrincipal(), getClientKeytab().getAbsolutePath());
+ }
} else {
log.info("Removing tables which appear to be from a previous test run");
cleanupTables();