You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/01/15 18:51:39 UTC

[3/4] accumulo git commit: ACCUMULO-2815 Support for Kerberos client authentication.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/docs/src/main/asciidoc/chapters/kerberos.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/kerberos.txt b/docs/src/main/asciidoc/chapters/kerberos.txt
new file mode 100644
index 0000000..3dcac6d
--- /dev/null
+++ b/docs/src/main/asciidoc/chapters/kerberos.txt
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+== Kerberos
+
+=== Overview
+
+Kerberos is a network authentication protocol that provides a secure way for
+peers to prove their identity over an unsecure network in a client-server model.
+A centralized key-distribution center (KDC) is the service that coordinates
+authentication between a client and a server. Clients and servers use "tickets",
+obtained from the KDC via a password or a special file called a "keytab", to
+communicate with the KDC and prove their identity. A KDC administrator must
+create the principal (name for the client/server identiy) and the password
+or keytab, securely passing the necessary information to the actual user/service.
+Properly securing the KDC and generated ticket material is central to the security
+model and is mentioned only as a warning to administrators running their own KDC.
+
+To interact with Kerberos programmatically, GSSAPI and SASL are two standards
+which allow cross-language integration with Kerberos for authentication. GSSAPI,
+the generic security service application program interface, is a standard which
+Kerberos implements. In the Java programming language, the language itself also implements
+GSSAPI which is leveraged by other applications, like Apache Hadoop and Apache Thrift.
+SASL, simple authentication and security layer, is a framework for authentication and
+and security over the network. SASL provides a number of mechanisms for authentication,
+one of which is GSSAPI. Thus, SASL provides the transport which authenticates 
+using GSSAPI that Kerberos implements.
+
+Kerberos is a very complicated software application and is deserving of much
+more description than can be provided here. An http://www.roguelynn.com/words/explain-like-im-5-kerberos/[explain like
+I'm 5] blog post is very good at distilling the basics, while http://web.mit.edu/kerberos/[MIT Kerberos's project page]
+contains lots of documentation for users or administrators. Various Hadoop "vendors"
+also provide free documentation that includes step-by-step instructions for
+configuring Hadoop and ZooKeeper (which will be henceforth considered as prerequisites).
+
+=== Within Hadoop
+
+Out of the box, HDFS and YARN have no ability to enforce that a user is who
+they claim they are. Thus, any basic Hadoop installation should be treated as
+unsecure: any user with access to the cluster has the ability to access any data.
+Using Kerberos to provide authentication, users can be strongly identified, delegating
+to Kerberos to determine who a user is and enforce that a user is who they claim to be.
+As such, Kerberos is widely used across the entire Hadoop ecosystem for strong
+authentication. Since server processes accessing HDFS or YARN are required
+to use Kerberos to authenticate with HDFS, it makes sense that they also require
+Kerberos authentication from their clients, in addition to other features provided
+by SASL.
+
+A typical deployment involves the creation of Kerberos principals for all server
+processes (Hadoop datanodes and namenode(s), ZooKeepers), the creation of a keytab
+file for each principal and then proper configuration for the Hadoop site xml files.
+Users also need Kerberos principals created for them; however, a user typically
+uses a password to identify themselves instead of a keytab. Users can obtain a
+ticket granting ticket (TGT) from the KDC using their password which allows them
+to authenticate for the lifetime of the TGT (typically one day by default) and alleviates
+the need for further password authentication.
+
+For client server applications, like web servers, a keytab can be created which
+allow for fully-automated Kerberos identification removing the need to enter any
+password, at the cost of needing to protect the keytab file. These principals
+will apply directly to authentication for clients accessing Accumulo and the
+Accumulo processes accessing HDFS.
+
+=== Configuring Accumulo
+
+To configure Accumulo for use with Kerberos, both client-facing and server-facing
+changes must be made for a functional system on secured Hadoop. As previously mentioned,
+numerous guidelines already exist on the subject of configuring Hadoop and ZooKeeper for
+use with Kerberos and won't be covered here. It is assumed that you have functional
+Hadoop and ZooKeeper already installed.
+
+==== Servers
+
+The first step is to obtain a Kerberos identity for the Accumulo server processes.
+When running Accumulo with Kerberos enabled, a valid Kerberos identity will be required
+to initiate any RPC between Accumulo processes (e.g. Master and TabletServer) in addition
+to any HDFS action (e.g. client to HDFS or TabletServer to HDFS).
+
+===== Generate Principal and Keytab
+
+In the +kadmin.local+ shell or using the +-q+ option on +kadmin.local+, create a
+principal for Accumulo for all hosts that are running Accumulo processes. A Kerberos
+principal is of the form "primary/instance@REALM". "accumulo" is commonly the "primary"
+(although not required) and the "instance" is the fully-qualified domain name for
+the host that will be running the Accumulo process -- this is required.
+
+----
+kadmin.local -q "addprinc -randkey accumulo/host.domain.com"
+----
+
+Perform the above for each node running Accumulo processes in the instance, modifying
+"host.domain.com" for your network. The +randkey+ option generates a random password
+because we will use a keytab for authentication, not a password, since the Accumulo
+server processes don't have an interactive console to enter a password into.
+
+----
+kadmin.local -q "xst -k accumulo.hostname.keytab accumulo/host.domain.com"
+----
+
+To simplify deployments, at thet cost of security, all Accumulo principals could
+be globbed into a single keytab
+
+----
+kadmin.local -q "xst -k accumulo.service.keytab -glob accumulo*"
+----
+
+To ensure that the SASL handshake can occur from clients to servers and servers to servers,
+all Accumulo servers must share the same instance and realm principal components as the
+"client" must know these to setup the connection with the "server".
+
+===== Server Configuration
+
+A number of properties need to be changed to account to properly configure servers
+in +accumulo-site.xml+.
+
+* *general.kerberos.keytab*=_/etc/security/keytabs/accumulo.service.keytab_
+** The path to the keytab for Accumulo on local filesystem.
+** Change the value to the actual path on your system.
+* *general.kerberos.principal*=_accumulo/_HOST@REALM_
+** The Kerberos principal for Accumulo, needs to match the keytab.
+** "_HOST" can be used instead of the actual hostname in the principal and will be
+automatically expanded to the current FQDN which reduces the configuration file burden.
+* *instance.rpc.sasl.enabled*=_true_
+** Enables SASL for the Thrift Servers (supports GSSAPI)
+* *instance.security.authenticator*=_org.apache.accumulo.server.security.handler.KerberosAuthenticator_
+** Configures Accumulo to use the Kerberos principal as the Accumulo username/principal
+* *instance.security.authorizor*=_org.apache.accumulo.server.security.handler.KerberosAuthorizor_
+** Configures Accumulo to use the Kerberos principal for authorization purposes
+* *instance.security.permissionHandler*=_org.apache.accumulo.server.security.handler.KerberosPermissionHandler_
+** Configures Accumulo to use the Kerberos principal for permission purposes
+* *trace.token.type*=_org.apache.accumulo.core.client.security.tokens.KerberosToken_
+** Configures the Accumulo Tracer to use the KerberosToken for authentication when
+serializing traces to the trace table.
+* *trace.user*=_accumulo/_HOST@REALM_
+** The tracer process needs valid credentials to serialize traces to Accumulo.
+** While the other server processes are creating a SystemToken from the provided keytab and principal, we can
+still use a normal KerberosToken and the same keytab/principal to serialize traces. Like
+non-Kerberized instances, the table must be created and permissions granted to the trace.user.
+** The same +_HOST+ replacement is performed on this value, substituted the FQDN for +_HOST+.
+
+Although it should be a prerequisite, it is ever important that you have DNS properly
+configured for your nodes and that Accumulo is configured to use the FQDN. It
+is extremely important to use the FQDN in each of the "hosts" files for each
+Accumulo process: +masters+, +monitors+, +slaves+, +tracers+, and +gc+.
+
+===== KerberosAuthenticator
+
+The +KerberosAuthenticator+ is an implementation of the pluggable security interfaces
+that Accumulo provides. It builds on top of what the default ZooKeeper-based implementation,
+but removes the need to create user accounts with passwords in Accumulo for clients. As
+long as a client has a valid Kerberos identity, they can connect to and interact with
+Accumulo, but without any permissions (e.g. cannot create tables or write data). Leveraging
+ZooKeeper removes the need to change the permission handler and authorizor, so other Accumulo
+functions regarding permissions and cell-level authorizations do not change.
+
+It is extremely important to note that, while user operations like +SecurityOperations.listLocalUsers()+,
++SecurityOperations.dropLocalUser()+, and +SecurityOperations.createLocalUser()+ will not return
+errors, these methods are not equivalent to normal installations, as they will only operate on
+users which have, at one point in time, authenticated with Accumulo using their Kerberos identity.
+The KDC is still the authoritative entity for user management. The previously mentioned methods
+are provided as they simplify management of users within Accumulo, especially with respect
+to granting Authorizations and Permissions to new users.
+
+===== Verifying secure access
+
+To verify that servers have correctly started with Kerberos enabled, ensure that the processes
+are actually running (they should exit immediately if login fails) and verify that you see
+something similar to the following in the application log.
+
+----
+2015-01-07 11:57:56,826 [security.SecurityUtil] INFO : Attempting to login with keytab as accumulo/hostname@EXAMPLE.COM
+2015-01-07 11:57:56,830 [security.UserGroupInformation] INFO : Login successful for user accumulo/hostname@EXAMPLE.COM using keytab file /etc/security/keytabs/accumulo.service.keytab
+----
+
+==== Clients
+
+===== Create client principal
+
+Like the Accumulo servers, clients must also have a Kerberos principal created for them. The
+primary difference between a server principal is that principals for users are created
+with a password and also not qualified to a specific instance (host).
+
+----
+kadmin.local -q "addprinc $user"
+----
+
+The above will prompt for a password for that user which will be used to identify that $user.
+The user can verify that they can authenticate with the KDC using the command `kinit $user`.
+Upon entering the correct password, a local credentials cache will be made which can be used
+to authenticate with Accumulo, access HDFS, etc.
+
+The user can verify the state of their local credentials cache by using the command `klist`.
+
+----
+$ klist
+Ticket cache: FILE:/tmp/krb5cc_123
+Default principal: user@EXAMPLE.COM
+
+Valid starting       Expires              Service principal
+01/07/2015 11:56:35  01/08/2015 11:56:35  krbtgt/EXAMPLE.COM@EXAMPLE.COM
+	renew until 01/14/2015 11:56:35
+----
+
+===== Configuration
+
+The second thing clients need to do is to set up their client configuration file. By
+default, this file is stored in +~/.accumulo/conf+, +$ACCUMULO_CONF_DIR/client.conf+ or
++$ACCUMULO_HOME/conf/client.conf+. Accumulo utilities also allow you to provide your own
+copy of this file in any location using the +--config-file+ command line option.
+
+Three items need to be set to enable access to Accumulo:
+
+* +instance.rpc.sasl.enabled+=_true_
+* +kerberos.server.primary+=_accumulo_
+* +kerberos.server.realm+=_EXAMPLE.COM_
+
+The second and third properties *must* match the configuration of the accumulo servers; this is
+required to set up the SASL transport.
+
+==== Debugging
+
+*Q*: I have valid Kerberos credentials and a correct client configuration file but 
+I still get errors like:
+
+----
+java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
+----
+
+*A*: When you have a valid client configuration and Kerberos TGT, it is possible that the search
+path for your local credentials cache is incorrect. Check the value of the KRB5CCNAME environment
+value, and ensure it matches the value reported by `klist`.
+
+----
+$ echo $KRB5CCNAME
+
+$ klist 
+Ticket cache: FILE:/tmp/krb5cc_123
+Default principal: user@EXAMPLE.COM
+
+Valid starting       Expires              Service principal
+01/07/2015 11:56:35  01/08/2015 11:56:35  krbtgt/EXAMPLE.COM@EXAMPLE.COM
+	renew until 01/14/2015 11:56:35
+$ export KRB5CCNAME=/tmp/krb5cc_123
+$ echo $KRB5CCNAME
+/tmp/krb5cc_123
+----
+
+*Q*: I thought I had everything configured correctly, but my client/server still fails to log in.
+I don't know what is actually failing.
+
+*A*: Add the following system property to the JVM invocation:
+
+----
+-Dsun.security.krb5.debug=true
+----
+
+This will enable lots of extra debugging at the JVM level which is often sufficient to
+diagnose some high-level configuration problem. Client applications can add this system property by
+hand to the command line and Accumulo server processes or applications started using the `accumulo`
+script by adding the property to +ACCUMULO_GENERAL_OPTS+ in +$ACCUMULO_CONF_DIR/accumulo-env.sh+.
+
+Additionally, you can increase the log4j levels on +org.apache.hadoop.security+, which includes the 
+Hadoop +UserGroupInformation+ class, which will include some high-level debug statements. This
+can be controlled in your client application, or using +$ACCUMULO_CONF_DIR/generic_logger.xml+
+
+*Q*: All of my Accumulo processes successfully start and log in with their
+keytab, but they are unable to communicate with each other, showing the
+following errors:
+
+----
+2015-01-12 14:47:27,055 [transport.TSaslTransport] ERROR: SASL negotiation failure
+javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]
+        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
+        at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
+        at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
+        at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
+        at org.apache.accumulo.core.rpc.UGIAssumingTransport$1.run(UGIAssumingTransport.java:53)
+        at org.apache.accumulo.core.rpc.UGIAssumingTransport$1.run(UGIAssumingTransport.java:49)
+        at java.security.AccessController.doPrivileged(Native Method)
+        at javax.security.auth.Subject.doAs(Subject.java:415)
+        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
+        at org.apache.accumulo.core.rpc.UGIAssumingTransport.open(UGIAssumingTransport.java:49)
+        at org.apache.accumulo.core.rpc.ThriftUtil.createClientTransport(ThriftUtil.java:357)
+        at org.apache.accumulo.core.rpc.ThriftUtil.createTransport(ThriftUtil.java:255)
+        at org.apache.accumulo.server.master.LiveTServerSet$TServerConnection.getTableMap(LiveTServerSet.java:106)
+        at org.apache.accumulo.master.Master.gatherTableInformation(Master.java:996)
+        at org.apache.accumulo.master.Master.access$600(Master.java:160)
+        at org.apache.accumulo.master.Master$StatusThread.updateStatus(Master.java:911)
+        at org.apache.accumulo.master.Master$StatusThread.run(Master.java:901)
+Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
+        at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:710)
+        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
+        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
+        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
+        ... 16 more
+Caused by: KrbException: Server not found in Kerberos database (7) - LOOKING_UP_SERVER
+        at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73)
+        at sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:192)
+        at sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:203)
+        at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:309)
+        at sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:115)
+        at sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:454)
+        at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:641)
+        ... 19 more
+Caused by: KrbException: Identifier doesn't match expected value (906)
+        at sun.security.krb5.internal.KDCRep.init(KDCRep.java:143)
+        at sun.security.krb5.internal.TGSRep.init(TGSRep.java:66)
+        at sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:61)
+        at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55)
+        ... 25 more
+----
+
+or 
+
+----
+2015-01-12 14:47:29,440 [server.TThreadPoolServer] ERROR: Error occurred during processing of message.
+java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: Peer indicated failure: GSS initiate failed
+        at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
+        at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory$1.run(UGIAssumingTransportFactory.java:51)
+        at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory$1.run(UGIAssumingTransportFactory.java:48)
+        at java.security.AccessController.doPrivileged(Native Method)
+        at javax.security.auth.Subject.doAs(Subject.java:356)
+        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1608)
+        at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory.getTransport(UGIAssumingTransportFactory.java:48)
+        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:208)
+        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
+        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
+        at java.lang.Thread.run(Thread.java:745)
+Caused by: org.apache.thrift.transport.TTransportException: Peer indicated failure: GSS initiate failed
+        at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:190)
+        at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
+        at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
+        at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
+        at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
+        ... 10 more
+----
+
+*A*: As previously mentioned, the hostname, and subsequently the address each Accumulo process is bound/listening
+on, is extremely important when negotiating an SASL connection. This problem commonly arises when the Accumulo
+servers are not configured to listen on the address denoted by their FQDN.
+
+The values in the Accumulo "hosts" files (In +$ACCUMULO_CONF_DIR+: +masters+, +monitors+, +slaves+, +tracers+,
+and +gc+) should match the instance componentof the Kerberos server principal (e.g. +host+ in +accumulo/host\@EXAMPLE.COM+).

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index d2a999d..76f332b 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -557,7 +558,21 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
               s.close();
           }
         }
-        Process initProcess = exec(Initialize.class, "--instance-name", config.getInstanceName(), "--password", config.getRootPassword());
+
+        LinkedList<String> args = new LinkedList<>();
+        args.add("--instance-name");
+        args.add(config.getInstanceName());
+        args.add("--user");
+        args.add(config.getRootUserName());
+
+        // If we aren't using SASL, add in the root password
+        final String saslEnabled = config.getSiteConfig().get(Property.INSTANCE_RPC_SASL_ENABLED.getKey());
+        if (null == saslEnabled || !Boolean.parseBoolean(saslEnabled)) {
+          args.add("--password");
+          args.add(config.getRootPassword());
+        }
+
+        Process initProcess = exec(Initialize.class, args.toArray(new String[0]));
         int ret = initProcess.waitFor();
         if (ret != 0) {
           throw new RuntimeException("Initialize process returned " + ret + ". Check the logs in " + config.getLogDir() + " for errors.");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
index 26c23ed..6d674f3 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
@@ -52,6 +52,7 @@ public class MiniAccumuloConfigImpl {
   private Map<String,String> systemProperties = new HashMap<String,String>();
 
   private String instanceName = "miniInstance";
+  private String rootUserName = "root";
 
   private File libDir;
   private File libExtDir;
@@ -667,4 +668,23 @@ public class MiniAccumuloConfigImpl {
   public Configuration getHadoopConfiguration() {
     return hadoopConf;
   }
+
+  /**
+   * @return the default Accumulo "superuser"
+   * @since 1.7.0
+   */
+  public String getRootUserName() {
+    return rootUserName;
+  }
+
+  /**
+   * Sets the default Accumulo "superuser".
+   *
+   * @param rootUserName
+   *          The name of the user to create with administrative permissions during initialization
+   * @since 1.7.0
+   */
+  public void setRootUserName(String rootUserName) {
+    this.rootUserName = rootUserName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6bffbe1..2c21ff6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
     <!-- surefire/failsafe plugin option -->
     <forkCount>1</forkCount>
     <!-- overwritten in profiles hadoop-2 -->
-    <hadoop.version>2.2.0</hadoop.version>
+    <hadoop.version>2.3.0</hadoop.version>
     <htrace.version>3.0.4</htrace.version>
     <httpclient.version>3.1</httpclient.version>
     <java.ver>1.7</java.ver>
@@ -359,6 +359,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-minikdc</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-tools</artifactId>
         <version>${hadoop.version}</version>
       </dependency>
@@ -877,6 +882,12 @@
             </rules>
           </configuration>
         </plugin>
+        <plugin>
+          <!-- Allows us to get the apache-ds bundle artifacts -->
+          <groupId>org.apache.felix</groupId>
+          <artifactId>maven-bundle-plugin</artifactId>
+          <version>2.4.0</version>
+        </plugin>
       </plugins>
     </pluginManagement>
     <plugins>
@@ -1070,6 +1081,13 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <!-- Allows us to get the apache-ds bundle artifacts -->
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <extensions>true</extensions>
+        <inherited>true</inherited>
+      </plugin>
     </plugins>
     <extensions>
       <extension>
@@ -1303,7 +1321,7 @@
         <!-- Denotes intention and allows the enforcer plugin to pass when
              the user is relying on default behavior; won't work to activate profile -->
         <hadoop.profile>2</hadoop.profile>
-        <hadoop.version>2.2.0</hadoop.version>
+        <hadoop.version>2.3.0</hadoop.version>
         <httpclient.version>3.1</httpclient.version>
         <slf4j.version>1.7.5</slf4j.version>
       </properties>
@@ -1320,7 +1338,7 @@
         </property>
       </activation>
       <properties>
-        <hadoop.version>2.2.0</hadoop.version>
+        <hadoop.version>2.3.0</hadoop.version>
         <httpclient.version>3.1</httpclient.version>
         <slf4j.version>1.7.5</slf4j.version>
       </properties>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 7eb4fbf..81509ee 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.proxy.thrift.AccumuloProxy;
 import org.apache.accumulo.server.rpc.RpcWrapper;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -135,7 +136,7 @@ public class Proxy {
     @SuppressWarnings("unchecked")
     Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<? extends TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass);
 
-    final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl));
+    final TProcessor processor = proxyProcConstructor.newInstance(TCredentialsUpdatingWrapper.service(RpcWrapper.service(impl), impl.getClass()));
 
     THsHaServer.Args args = new THsHaServer.Args(socket);
     args.processor(processor);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
index 09ae4f4..84c3853 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
@@ -16,14 +16,24 @@
  */
 package org.apache.accumulo.server;
 
+import java.io.IOException;
+
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Provides a server context for Accumulo server components that operate with the system credentials and have access to the system files and configuration.
@@ -38,6 +48,31 @@ public class AccumuloServerContext extends ClientContext {
   public AccumuloServerContext(ServerConfigurationFactory confFactory) {
     super(confFactory.getInstance(), getCredentials(confFactory.getInstance()), confFactory.getConfiguration());
     this.confFactory = confFactory;
+    if (null != getServerSaslParams()) {
+      // Server-side "client" check to make sure we're logged in as a user we expect to be
+      enforceKerberosLogin();
+    }
+  }
+
+  /**
+   * A "client-side" assertion for servers to validate that they are logged in as the expected user, per the configuration, before performing any RPC
+   */
+  // Should be private, but package-protected so EasyMock will work
+  void enforceKerberosLogin() {
+    final AccumuloConfiguration conf = confFactory.getSiteConfiguration();
+    // Unwrap _HOST into the FQDN to make the kerberos principal we'll compare against
+    final String kerberosPrincipal = SecurityUtil.getServerPrincipal(conf.get(Property.GENERAL_KERBEROS_PRINCIPAL));
+    UserGroupInformation loginUser;
+    try {
+      // The system user should be logged in via keytab when the process is started, not the currentUser() like KerberosToken
+      loginUser = UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      throw new RuntimeException("Could not get login user", e);
+    }
+
+    Preconditions.checkArgument(loginUser.hasKerberosCredentials(), "Server does not have Kerberos credentials");
+    Preconditions.checkArgument(kerberosPrincipal.equals(loginUser.getUserName()),
+        "Expected login user to be " + kerberosPrincipal + " but was " + loginUser.getUserName());
   }
 
   /**
@@ -64,4 +99,35 @@ public class AccumuloServerContext extends ClientContext {
     return SslConnectionParams.forServer(getConfiguration());
   }
 
+  public SaslConnectionParams getServerSaslParams() {
+    // Not functionally different than the client SASL params, just uses the site configuration
+    return SaslConnectionParams.forConfig(getServerConfigurationFactory().getSiteConfiguration());
+  }
+
+  /**
+   * Determine the type of Thrift server to instantiate given the server's configuration.
+   *
+   * @return A {@link ThriftServerType} value to denote the type of Thrift server to construct
+   */
+  public ThriftServerType getThriftServerType() {
+    AccumuloConfiguration conf = getConfiguration();
+    if (conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED)) {
+      if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+        throw new IllegalStateException("Cannot create a Thrift server capable of both SASL and SSL");
+      }
+
+      return ThriftServerType.SSL;
+    } else if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+      if (conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED)) {
+        throw new IllegalStateException("Cannot create a Thrift server capable of both SASL and SSL");
+      }
+
+      return ThriftServerType.SASL;
+    } else {
+      // Lets us control the type of Thrift server created, primarily for benchmarking purposes
+      String serverTypeName = conf.get(Property.GENERAL_RPC_SERVER_TYPE);
+      return ThriftServerType.get(serverTypeName);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 2da6ba0..4a9f1e7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -95,11 +95,13 @@ import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -280,11 +282,27 @@ public class Initialize {
       log.fatal("Failed to talk to zookeeper", e);
       return false;
     }
-    opts.rootpass = getRootPassword(opts);
-    return initialize(opts, instanceNamePath, fs);
+
+    String rootUser;
+    try {
+      rootUser = getRootUserName(opts);
+    } catch (Exception e) {
+      log.fatal("Failed to obtain user for administrative privileges");
+      return false;
+    }
+
+    // Don't prompt for a password when we're running SASL(Kerberos)
+    final AccumuloConfiguration siteConf = SiteConfiguration.getInstance();
+    if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+      opts.rootpass = UUID.randomUUID().toString().getBytes(UTF_8);
+    } else {
+      opts.rootpass = getRootPassword(opts, rootUser);
+    }
+
+    return initialize(opts, instanceNamePath, fs, rootUser);
   }
 
-  private static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) {
+  private static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs, String rootUser) {
 
     UUID uuid = UUID.randomUUID();
     // the actual disk locations of the root table and tablets
@@ -320,9 +338,38 @@ public class Initialize {
       return false;
     }
 
+    final ServerConfigurationFactory confFactory = new ServerConfigurationFactory(HdfsZooInstance.getInstance());
+
+    // When we're using Kerberos authentication, we need valid credentials to perform initialization. If the user provided some, use them.
+    // If they did not, fall back to the credentials present in accumulo-site.xml that the servers will use themselves.
     try {
-      AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
-      initSecurity(context, opts, uuid.toString());
+      final SiteConfiguration siteConf = confFactory.getSiteConfiguration();
+      if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+        final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        // We don't have any valid creds to talk to HDFS
+        if (!ugi.hasKerberosCredentials()) {
+          final String accumuloKeytab = siteConf.get(Property.GENERAL_KERBEROS_KEYTAB), accumuloPrincipal = siteConf.get(Property.GENERAL_KERBEROS_PRINCIPAL);
+
+          // Fail if the site configuration doesn't contain appropriate credentials to login as servers
+          if (StringUtils.isBlank(accumuloKeytab) || StringUtils.isBlank(accumuloPrincipal)) {
+            log.fatal("No Kerberos credentials provided, and Accumulo is not properly configured for server login");
+            return false;
+          }
+
+          log.info("Logging in as " + accumuloPrincipal + " with " + accumuloKeytab);
+
+          // Login using the keytab as the 'accumulo' user
+          UserGroupInformation.loginUserFromKeytab(accumuloPrincipal, accumuloKeytab);
+        }
+      }
+    } catch (IOException e) {
+      log.fatal("Failed to get the Kerberos user", e);
+      return false;
+    }
+
+    try {
+      AccumuloServerContext context = new AccumuloServerContext(confFactory);
+      initSecurity(context, opts, uuid.toString(), rootUser);
     } catch (Exception e) {
       log.fatal("Failed to initialize security", e);
       return false;
@@ -525,18 +572,43 @@ public class Initialize {
     return instanceNamePath;
   }
 
-  private static byte[] getRootPassword(Opts opts) throws IOException {
+  private static String getRootUserName(Opts opts) throws IOException {
+    AccumuloConfiguration conf = SiteConfiguration.getInstance();
+    final String keytab = conf.get(Property.GENERAL_KERBEROS_KEYTAB);
+    if (keytab.equals(Property.GENERAL_KERBEROS_KEYTAB.getDefaultValue()) || !conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+      return DEFAULT_ROOT_USER;
+    }
+
+    ConsoleReader c = getConsoleReader();
+    c.println("Running against secured HDFS");
+
+    if (null != opts.rootUser) {
+      return opts.rootUser;
+    }
+
+    do {
+      String user = c.readLine("Principal (user) to grant administrative privileges to : ");
+      if (user == null) {
+        // should not happen
+        System.exit(1);
+      }
+      if (!user.isEmpty()) {
+        return user;
+      }
+    } while (true);
+  }
+
+  private static byte[] getRootPassword(Opts opts, String rootUser) throws IOException {
     if (opts.cliPassword != null) {
       return opts.cliPassword.getBytes(UTF_8);
     }
     String rootpass;
     String confirmpass;
     do {
-      rootpass = getConsoleReader()
-          .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*');
+      rootpass = getConsoleReader().readLine("Enter initial password for " + rootUser + " (this may not be applicable for your security setup): ", '*');
       if (rootpass == null)
         System.exit(0);
-      confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*');
+      confirmpass = getConsoleReader().readLine("Confirm initial password for " + rootUser + ": ", '*');
       if (confirmpass == null)
         System.exit(0);
       if (!rootpass.equals(confirmpass))
@@ -545,8 +617,9 @@ public class Initialize {
     return rootpass.getBytes(UTF_8);
   }
 
-  private static void initSecurity(AccumuloServerContext context, Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException {
-    AuditedSecurityOperation.getInstance(context, true).initializeSecurity(context.rpcCreds(), DEFAULT_ROOT_USER, opts.rootpass);
+  private static void initSecurity(AccumuloServerContext context, Opts opts, String iid, String rootUser) throws AccumuloSecurityException,
+      ThriftSecurityException, IOException {
+    AuditedSecurityOperation.getInstance(context, true).initializeSecurity(context.rpcCreds(), rootUser, opts.rootpass);
   }
 
   public static void initSystemTablesConfig() throws IOException {
@@ -635,6 +708,8 @@ public class Initialize {
     String cliInstanceName;
     @Parameter(names = "--password", description = "set the password on the command line")
     String cliPassword;
+    @Parameter(names = {"-u", "--user"}, description = "the name of the user to grant system permissions to")
+    String rootUser = null;
 
     byte[] rootpass = null;
   }
@@ -653,8 +728,9 @@ public class Initialize {
       if (opts.resetSecurity) {
         AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
         if (isInitialized(fs)) {
-          opts.rootpass = getRootPassword(opts);
-          initSecurity(context, opts, HdfsZooInstance.getInstance().getInstanceID());
+          final String rootUser = getRootUserName(opts);
+          opts.rootpass = getRootPassword(opts, rootUser);
+          initSecurity(context, opts, HdfsZooInstance.getInstance().getInstanceID(), rootUser);
         } else {
           log.fatal("Attempted to reset security on accumulo before it was initialized");
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
new file mode 100644
index 0000000..f8400e2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
+import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extracts the TCredentials object from the RPC argument list and asserts that the Accumulo principal is equal to the Kerberos 'primary' component of the
+ * Kerberos principal (e.g. Accumulo principal of "frank" equals "frank" from "frank/hostname@DOMAIN").
+ */
+public class TCredentialsUpdatingInvocationHandler<I> implements InvocationHandler {
+  private static final Logger log = LoggerFactory.getLogger(TCredentialsUpdatingInvocationHandler.class);
+
+  private static final ConcurrentHashMap<String,Class<? extends AuthenticationToken>> TOKEN_CLASS_CACHE = new ConcurrentHashMap<>();
+  private final I instance;
+
+  protected TCredentialsUpdatingInvocationHandler(final I serverInstance) {
+    instance = serverInstance;
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+    updateArgs(args);
+
+    return invokeMethod(method, args);
+  }
+
+  /**
+   * Try to find a TCredentials object in the argument list, and, when the AuthenticationToken is a KerberosToken, set the principal from the SASL server as the
+   * TCredentials principal. This ensures that users can't spoof a different principal into the Credentials than what they used to authenticate.
+   */
+  protected void updateArgs(Object[] args) throws ThriftSecurityException {
+    // If we don't have at least two args
+    if (args == null || args.length < 2) {
+      return;
+    }
+
+    TCredentials tcreds = null;
+    if (args[0] != null && args[0] instanceof TCredentials) {
+      tcreds = (TCredentials) args[0];
+    } else if (args[1] != null && args[1] instanceof TCredentials) {
+      tcreds = (TCredentials) args[1];
+    }
+
+    // If we don't find a tcredentials in the first two positions
+    if (null == tcreds) {
+      // Not all calls require authentication (e.g. closeMultiScan). We need to let these pass through.
+      log.trace("Did not find a TCredentials object in the first two positions of the argument list, not updating principal");
+      return;
+    }
+
+    Class<? extends AuthenticationToken> tokenClass = getTokenClassFromName(tcreds.tokenClassName);
+    // If the authentication token isn't a KerberosToken
+    if (!KerberosToken.class.isAssignableFrom(tokenClass) && !SystemToken.class.isAssignableFrom(tokenClass)) {
+      // Don't include messages about SystemToken since it's internal
+      log.debug("Will not update principal on authentication tokens other than KerberosToken. Received " + tokenClass);
+      throw new ThriftSecurityException("Did not receive a valid token", SecurityErrorCode.BAD_CREDENTIALS);
+    }
+
+    // The Accumulo principal extracted from the SASL transport
+    final String principal = UGIAssumingProcessor.currentPrincipal();
+
+    if (null == principal) {
+      log.debug("Found KerberosToken in TCredentials, but did not receive principal from SASL processor");
+      throw new ThriftSecurityException("Did not extract principal from Thrift SASL processor", SecurityErrorCode.BAD_CREDENTIALS);
+    }
+
+    // The principal from the SASL transport should match what the user requested as their Accumulo principal
+    if (!principal.equals(tcreds.principal)) {
+      final String msg = "Principal in credentials object should match kerberos principal. Expected '" + principal + "' but was '" + tcreds.principal + "'";
+      log.warn(msg);
+      throw new ThriftSecurityException(msg, SecurityErrorCode.BAD_CREDENTIALS);
+    }
+  }
+
+  protected Class<? extends AuthenticationToken> getTokenClassFromName(String tokenClassName) {
+    Class<? extends AuthenticationToken> typedClz = TOKEN_CLASS_CACHE.get(tokenClassName);
+    if (null == typedClz) {
+      Class<?> clz;
+      try {
+        clz = Class.forName(tokenClassName);
+      } catch (ClassNotFoundException e) {
+        log.debug("Could not create class from token name: " + tokenClassName, e);
+        return null;
+      }
+      typedClz = clz.asSubclass(AuthenticationToken.class);
+    }
+    TOKEN_CLASS_CACHE.putIfAbsent(tokenClassName, typedClz);
+    return typedClz;
+  }
+
+  private Object invokeMethod(Method method, Object[] args) throws Throwable {
+    try {
+      return method.invoke(instance, args);
+    } catch (InvocationTargetException ex) {
+      throw ex.getCause();
+    }
+  }
+
+  /**
+   * Visibile for testing
+   */
+  protected ConcurrentHashMap<String,Class<? extends AuthenticationToken>> getTokenCache() {
+    return TOKEN_CLASS_CACHE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
new file mode 100644
index 0000000..4621d36
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+
+/**
+ * Utility method to ensure that the instance of TCredentials which is passed to the implementation of a Thrift service has the correct principal from SASL at
+ * the Thrift transport layer when SASL/GSSAPI (kerberos) is enabled. This ensures that we use the strong authentication provided to us and disallow any other
+ * principal names that client (malicious or otherwise) might pass in.
+ */
+public class TCredentialsUpdatingWrapper {
+
+  public static <T> T service(final T instance, final Class<? extends T> originalClass) {
+    InvocationHandler handler = new TCredentialsUpdatingInvocationHandler<T>(instance);
+
+    @SuppressWarnings("unchecked")
+    T proxiedInstance = (T) Proxy.newProxyInstance(originalClass.getClassLoader(), originalClass.getInterfaces(), handler);
+
+    return proxiedInstance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 641c0bf..985df9c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.server.rpc;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.BindException;
 import java.net.InetAddress;
@@ -33,26 +34,34 @@ import javax.net.ssl.SSLServerSocket;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
 /**
@@ -115,6 +124,11 @@ public class TServerUtils {
       portSearch = config.getBoolean(portSearchProperty);
 
     final int simpleTimerThreadpoolSize = config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+    final ThriftServerType serverType = service.getThriftServerType();
+
+    if (ThriftServerType.SASL == serverType) {
+      processor = updateSaslProcessor(serverType, processor);
+    }
 
     // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
     TimedProcessor timedProcessor = new TimedProcessor(config, processor, serverName, threadName);
@@ -135,8 +149,9 @@ public class TServerUtils {
           port = 1024 + port % (65535 - 1024);
         try {
           HostAndPort addr = HostAndPort.fromParts(hostname, port);
-          return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks,
-              maxMessageSize, service.getServerSslParams(), service.getClientTimeoutInMillis());
+          return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads,
+              simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize,
+              service.getServerSslParams(), service.getServerSaslParams(), service.getClientTimeoutInMillis());
         } catch (TTransportException ex) {
           log.error("Unable to start TServer", ex);
           if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
@@ -209,7 +224,31 @@ public class TServerUtils {
   }
 
   /**
-   * Create a TThreadPoolServer with the given transport and processor
+   * Creates a TTheadPoolServer for normal unsecure operation. Useful for comparing performance against SSL or SASL transports.
+   *
+   * @param address
+   *          Address to bind to
+   * @param processor
+   *          TProcessor for the server
+   * @param maxMessageSize
+   *          Maximum size of a Thrift message allowed
+   * @return A configured TThreadPoolServer and its bound address information
+   */
+  public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize) throws TTransportException {
+
+    TServerSocket transport = new TServerSocket(address.getPort());
+    TThreadPoolServer server = createThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize));
+
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+    }
+
+    return new ServerAddress(server, address);
+
+  }
+
+  /**
+   * Create a TThreadPoolServer with the given transport and processo with the default transport factory.r
    *
    * @param transport
    *          TServerTransport for the server
@@ -218,9 +257,23 @@ public class TServerUtils {
    * @return A configured TThreadPoolServer
    */
   public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
-    final TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+    return createThreadPoolServer(transport, processor, ThriftUtil.transportFactory());
+  }
+
+  /**
+   * Create a TServer with the provided server transport, processor and transport factory.
+   *
+   * @param transport
+   *          TServerTransport for the server
+   * @param processor
+   *          TProcessor for the server
+   * @param transportFactory
+   *          TTransportFactory for the server
+   */
+  public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) {
+    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
-    options.transportFactory(ThriftUtil.transportFactory());
+    options.transportFactory(transportFactory);
     options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
     return new TThreadPoolServer(options);
   }
@@ -284,7 +337,7 @@ public class TServerUtils {
    */
   public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
       throws TTransportException {
-    org.apache.thrift.transport.TServerSocket transport;
+    TServerSocket transport;
     try {
       transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
     } catch (UnknownHostException e) {
@@ -296,14 +349,63 @@ public class TServerUtils {
     return new ServerAddress(createThreadPoolServer(transport, processor), address);
   }
 
-  /**
-   * Create a Thrift server given the provided and Accumulo configuration.
-   */
-  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName,
-      int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout)
+  public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SaslConnectionParams params,
+      final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize)
       throws TTransportException {
-    return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
-        timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
+    // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does,
+    // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail
+    // when the server does an accept() to (presumably) wake up the eventing system.
+    log.info("Creating SASL thread pool thrift server on port=" + address.getPort());
+    TServerSocket transport = new TServerSocket(address.getPort());
+
+    final String hostname;
+    try {
+      hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      throw new TTransportException(e);
+    }
+
+    final UserGroupInformation serverUser;
+    try {
+      serverUser = UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
+
+    log.trace("Logged in as {}, creating TSsaslServerTransport factory as {}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
+
+    // Make the SASL transport factory with the instance and primary from the kerberos server principal, SASL properties
+    // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication ID. Despite the 'protocol' argument seeming to be useless, it
+    // *must* be the primary of the server.
+    TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
+    saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+        new SaslRpcServer.SaslGssCallbackHandler());
+
+    // Updates the clientAddress threadlocal so we know who the client's address
+    final ClientInfoProcessorFactory clientInfoFactory = new ClientInfoProcessorFactory(clientAddress, processor);
+
+    // Make sure the TTransportFactory is performing a UGI.doAs
+    TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
+
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+    }
+
+    return new ServerAddress(new TThreadPoolServer(new TThreadPoolServer.Args(transport).transportFactory(ugiTransportFactory)
+        .processorFactory(clientInfoFactory)
+        .protocolFactory(ThriftUtil.protocolFactory())), address);
+  }
+
+  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
+      String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+      SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+    if (ThriftServerType.SASL == serverType) {
+      processor = updateSaslProcessor(serverType, processor);
+    }
+
+    return startTServer(address, serverType, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+        timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
   }
 
   /**
@@ -311,14 +413,33 @@ public class TServerUtils {
    *
    * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
    */
-  public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
-      int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+  public static ServerAddress startTServer(HostAndPort address,ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName, int numThreads,
+      int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,  SslConnectionParams sslParams,
+      SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+    // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports
+    // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues.
+    Preconditions.checkArgument(!(sslParams != null && saslParams != null), "Cannot start a Thrift server using both SSL and SASL");
 
     ServerAddress serverAddress;
-    if (sslParams != null) {
-      serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
-    } else {
-      serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+    switch (serverType) {
+      case SSL:
+        log.debug("Instantiating SSL Thrift server");
+        serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams);
+        break;
+      case SASL:
+        log.debug("Instantiating SASL Thrift server");
+        serverAddress = createSaslThreadPoolServer(address, processor, serverSocketTimeout, saslParams, serverName, threadName, numThreads, numSTThreads,
+            timeBetweenThreadChecks, maxMessageSize);
+        break;
+      case THREADPOOL:
+        log.debug("Instantiating unsecure TThreadPool Thrift server");
+        serverAddress = createBlockingServer(address, processor, maxMessageSize);
+        break;
+      case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default
+      default:
+        log.debug("Instantiating default, unsecure custom half-async Thrift server");
+        serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
     }
 
     final TServer finalServer = serverAddress.server;
@@ -368,4 +489,21 @@ public class TServerUtils {
       log.error("Unable to call shutdownNow", e);
     }
   }
+
+  /**
+   * Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication works. Requires the <code>serverType</code> to be
+   * {@link ThriftServerType#SASL} and throws an exception when it is not.
+   *
+   * @return A {@link UGIAssumingProcessor} which wraps the provided processor
+   */
+  private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) {
+    Preconditions.checkArgument(ThriftServerType.SASL == serverType);
+
+    // Wrap the provided processor in our special processor which proxies the provided UGI on the logged-in UGI
+    // Important that we have Timed -> UGIAssuming -> [provided] to make sure that the metrics are still reported
+    // as the logged-in user.
+    log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
+
+    return new UGIAssumingProcessor(processor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
new file mode 100644
index 0000000..60d5402
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * The type of configured Thrift server to start. This is meant more as a developer knob to ensure that appropriate Thrift servers can be constructed to make a
+ * better test on the overhead of SSL or SASL.
+ *
+ * Both SSL and SASL don't presently work with TFramedTransport which means that the Thrift servers with asynchronous support will fail with these transports.
+ * As such, we want to ensure that any benchmarks against "unsecure" Accumulo use the same type of Thrift server.
+ */
+public enum ThriftServerType {
+  CUSTOM_HS_HA("custom_hs_ha"), THREADPOOL("threadpool"), SSL("ssl"), SASL("sasl");
+
+  private final String name;
+
+  private ThriftServerType(String name) {
+    this.name = name;
+  }
+
+  public static ThriftServerType get(String name) {
+    // Our custom HsHa server is the default (if none is provided)
+    if (StringUtils.isBlank(name)) {
+      return CUSTOM_HS_HA;
+    }
+    return ThriftServerType.valueOf(name.trim().toUpperCase());
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 5fe57b7..7adb46e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -51,6 +51,7 @@ import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.security.handler.Authenticator;
 import org.apache.accumulo.server.security.handler.Authorizor;
+import org.apache.accumulo.server.security.handler.KerberosAuthenticator;
 import org.apache.accumulo.server.security.handler.PermissionHandler;
 import org.apache.accumulo.server.security.handler.ZKAuthenticator;
 import org.apache.accumulo.server.security.handler.ZKAuthorizor;
@@ -68,6 +69,7 @@ public class SecurityOperation {
   protected Authorizor authorizor;
   protected Authenticator authenticator;
   protected PermissionHandler permHandle;
+  protected boolean isKerberos;
   private static String rootUserName = null;
   private final ZooCache zooCache;
   private final String ZKUserPath;
@@ -126,11 +128,11 @@ public class SecurityOperation {
         || !permHandle.validSecurityHandlers(authent, author))
       throw new RuntimeException(authorizor + ", " + authenticator + ", and " + pm
           + " do not play nice with eachother. Please choose authentication and authorization mechanisms that are compatible with one another.");
+
+    isKerberos = KerberosAuthenticator.class.isAssignableFrom(authenticator.getClass());
   }
 
   public void initializeSecurity(TCredentials credentials, String rootPrincipal, byte[] token) throws AccumuloSecurityException, ThriftSecurityException {
-    authenticate(credentials);
-
     if (!isSystemUser(credentials))
       throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 
@@ -160,11 +162,31 @@ public class SecurityOperation {
       throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.INVALID_INSTANCEID);
 
     Credentials creds = Credentials.fromThrift(credentials);
+
     if (isSystemUser(credentials)) {
       if (!(context.getCredentials().equals(creds))) {
+        log.debug("Provided credentials did not match server's expected credentials. Expected " + context.getCredentials() + " but got " + creds);
         throw new ThriftSecurityException(creds.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS);
       }
     } else {
+      // Not the system user
+
+      if (isKerberos) {
+        // If we have kerberos credentials for a user from the network but no account
+        // in the system, we need to make one before proceeding
+        try {
+          if (!authenticator.userExists(creds.getPrincipal())) {
+            // If we call the normal createUser method, it will loop back into this method
+            // when it tries to check if the user has permission to create users
+            _createUser(credentials, creds, Authorizations.EMPTY);
+          }
+        } catch (AccumuloSecurityException e) {
+          log.debug("Failed to determine if user exists", e);
+          throw e.asThriftException();
+        }
+      }
+
+      // Check that the user is authenticated (a no-op at this point for kerberos)
       try {
         if (!authenticator.authenticateUser(creds.getPrincipal(), creds.getToken())) {
           throw new ThriftSecurityException(creds.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS);
@@ -190,6 +212,15 @@ public class SecurityOperation {
       return true;
     try {
       Credentials toCreds = Credentials.fromThrift(toAuth);
+
+      if (isKerberos) {
+        // If we have kerberos credentials for a user from the network but no account
+        // in the system, we need to make one before proceeding
+        if (!authenticator.userExists(toCreds.getPrincipal())) {
+          createUser(credentials, toCreds, Authorizations.EMPTY);
+        }
+      }
+
       return authenticator.authenticateUser(toCreds.getPrincipal(), toCreds.getToken());
     } catch (AccumuloSecurityException e) {
       throw e.asThriftException();
@@ -579,14 +610,23 @@ public class SecurityOperation {
   public void createUser(TCredentials credentials, Credentials newUser, Authorizations authorizations) throws ThriftSecurityException {
     if (!canCreateUser(credentials, newUser.getPrincipal()))
       throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+    _createUser(credentials, newUser, authorizations);
+    if (canChangeAuthorizations(credentials, newUser.getPrincipal())) {
+      try {
+        authorizor.changeAuthorizations(newUser.getPrincipal(), authorizations);
+      } catch (AccumuloSecurityException ase) {
+        throw ase.asThriftException();
+      }
+    }
+  }
+
+  protected void _createUser(TCredentials credentials, Credentials newUser, Authorizations authorizations) throws ThriftSecurityException {
     try {
       AuthenticationToken token = newUser.getToken();
       authenticator.createUser(newUser.getPrincipal(), token);
       authorizor.initUser(newUser.getPrincipal());
       permHandle.initUser(newUser.getPrincipal());
       log.info("Created user " + newUser.getPrincipal() + " at the request of user " + credentials.getPrincipal());
-      if (canChangeAuthorizations(credentials, newUser.getPrincipal()))
-        authorizor.changeAuthorizations(newUser.getPrincipal(), authorizations);
     } catch (AccumuloSecurityException ase) {
       throw ase.asThriftException();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
index 42d1313..6014139 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java
@@ -69,10 +69,11 @@ public class SecurityUtil {
    */
   public static boolean login(String principalConfig, String keyTabPath) {
     try {
-      String principalName = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principalConfig, InetAddress.getLocalHost().getCanonicalHostName());
+      String principalName = getServerPrincipal(principalConfig);
       if (keyTabPath != null && principalName != null && keyTabPath.length() != 0 && principalName.length() != 0) {
+        log.info("Attempting to login with keytab as " + principalName);
         UserGroupInformation.loginUserFromKeytab(principalName, keyTabPath);
-        log.info("Succesfully logged in as user " + principalConfig);
+        log.info("Succesfully logged in as user " + principalName);
         return true;
       }
     } catch (IOException io) {
@@ -80,4 +81,15 @@ public class SecurityUtil {
     }
     return false;
   }
+
+  /**
+   * {@link org.apache.hadoop.security.SecurityUtil#getServerPrincipal(String, String)}
+   */
+  public static String getServerPrincipal(String configuredPrincipal) {
+    try {
+      return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(configuredPrincipal, InetAddress.getLocalHost().getCanonicalHostName());
+    } catch (IOException e) {
+      throw new RuntimeException("Could not convert configured server principal: " + configuredPrincipal, e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
index 79201b1..51d50a1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
@@ -30,8 +30,10 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.Base64;
@@ -51,8 +53,8 @@ public final class SystemCredentials extends Credentials {
 
   private final TCredentials AS_THRIFT;
 
-  SystemCredentials(Instance instance) {
-    super(SYSTEM_PRINCIPAL, SystemToken.get(instance));
+  SystemCredentials(Instance instance, String principal, AuthenticationToken token) {
+    super(principal, token);
     AS_THRIFT = super.toThrift(instance);
   }
 
@@ -65,7 +67,16 @@ public final class SystemCredentials extends Credentials {
 
   public static SystemCredentials get(Instance instance) {
     check_permission();
-    return new SystemCredentials(instance);
+    String principal = SYSTEM_PRINCIPAL;
+    AccumuloConfiguration conf = SiteConfiguration.getInstance();
+    SaslConnectionParams saslParams = SaslConnectionParams.forConfig(conf);
+    if (null != saslParams) {
+      // Use the server's kerberos principal as the Accumulo principal. We could also unwrap the principal server-side, but the principal for SystemCredentials
+      // isnt' actually used anywhere, so it really doesn't matter. We can't include the kerberos principal in the SystemToken as it would break equality when
+      // different Accumulo servers are using different kerberos principals are their accumulo principal
+      principal = SecurityUtil.getServerPrincipal(conf.get(Property.GENERAL_KERBEROS_PRINCIPAL));
+    }
+    return new SystemCredentials(instance, principal, SystemToken.get(instance));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
new file mode 100644
index 0000000..61b8db0
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.Base64;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
+import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class KerberosAuthenticator implements Authenticator {
+  private static final Logger log = LoggerFactory.getLogger(KerberosAuthenticator.class);
+
+  private static final Set<Class<? extends AuthenticationToken>> SUPPORTED_TOKENS = Sets.newHashSet(Arrays.<Class<? extends AuthenticationToken>> asList(
+      KerberosToken.class, SystemToken.class));
+  private static final Set<String> SUPPORTED_TOKEN_NAMES = Sets.newHashSet(KerberosToken.class.getName(), SystemToken.class.getName());
+
+  private final ZKAuthenticator zkAuthenticator = new ZKAuthenticator();
+  private String zkUserPath;
+  private final ZooCache zooCache;
+
+  public KerberosAuthenticator() {
+    zooCache = new ZooCache();
+  }
+
+  @Override
+  public void initialize(String instanceId, boolean initialize) {
+    zkAuthenticator.initialize(instanceId, initialize);
+    zkUserPath = Constants.ZROOT + "/" + instanceId + "/users";
+  }
+
+  @Override
+  public boolean validSecurityHandlers(Authorizor auth, PermissionHandler pm) {
+    return true;
+  }
+
+  private void createUserNodeInZk(String principal) throws KeeperException, InterruptedException {
+    synchronized (zooCache) {
+      zooCache.clear();
+      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+      zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], NodeExistsPolicy.FAIL);
+    }
+  }
+
+  @Override
+  public void initializeSecurity(TCredentials credentials, String principal, byte[] token) throws AccumuloSecurityException, ThriftSecurityException {
+    try {
+      // remove old settings from zookeeper first, if any
+      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+      synchronized (zooCache) {
+        zooCache.clear();
+        if (zoo.exists(zkUserPath)) {
+          zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP);
+          log.info("Removed " + zkUserPath + "/" + " from zookeeper");
+        }
+
+        principal = Base64.encodeBase64String(principal.getBytes(UTF_8));
+
+        // prep parent node of users with root username
+        zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL);
+
+        createUserNodeInZk(principal);
+      }
+    } catch (KeeperException | InterruptedException e) {
+      log.error("Failed to initialize security", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean authenticateUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+    final String rpcPrincipal = UGIAssumingProcessor.currentPrincipal();
+
+    if (!rpcPrincipal.equals(principal)) {
+      // KerberosAuthenticator can't do perform this because KerberosToken is just a shim and doesn't contain the actual credentials
+      throw new AccumuloSecurityException(principal, SecurityErrorCode.AUTHENTICATOR_FAILED);
+    }
+
+    // User is authenticated at the transport layer -- nothing extra is necessary
+    if (token instanceof KerberosToken) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Set<String> listUsers() throws AccumuloSecurityException {
+    Set<String> base64Users = zkAuthenticator.listUsers();
+    Set<String> readableUsers = new HashSet<>();
+    for (String base64User : base64Users) {
+      readableUsers.add(new String(Base64.decodeBase64(base64User), UTF_8));
+    }
+    return readableUsers;
+  }
+
+  @Override
+  public synchronized void createUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+    if (!(token instanceof KerberosToken)) {
+      throw new UnsupportedOperationException("Expected a KerberosToken but got a " + token.getClass().getSimpleName());
+    }
+
+    principal = Base64.encodeBase64String(principal.getBytes(UTF_8));
+
+    try {
+      createUserNodeInZk(principal);
+    } catch (KeeperException e) {
+      if (e.code().equals(KeeperException.Code.NODEEXISTS)) {
+        log.error("User already exists in ZooKeeper", e);
+        throw new AccumuloSecurityException(principal, SecurityErrorCode.USER_EXISTS, e);
+      }
+      log.error("Failed to create user in ZooKeeper", e);
+      throw new AccumuloSecurityException(principal, SecurityErrorCode.CONNECTION_ERROR, e);
+    } catch (InterruptedException e) {
+      log.error("Interrupted trying to create node for user", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public synchronized void dropUser(String user) throws AccumuloSecurityException {
+    user = Base64.encodeBase64String(user.getBytes(UTF_8));
+    zkAuthenticator.dropUser(user);
+  }
+
+  @Override
+  public void changePassword(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+    throw new UnsupportedOperationException("Cannot change password with Kerberos authenticaton");
+  }
+
+  @Override
+  public synchronized boolean userExists(String user) throws AccumuloSecurityException {
+    user = Base64.encodeBase64String(user.getBytes(UTF_8));
+    return zkAuthenticator.userExists(user);
+  }
+
+  @Override
+  public Set<Class<? extends AuthenticationToken>> getSupportedTokenTypes() {
+    return SUPPORTED_TOKENS;
+  }
+
+  @Override
+  public boolean validTokenClass(String tokenClass) {
+    return SUPPORTED_TOKEN_NAMES.contains(tokenClass);
+  }
+
+}