You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/02/13 20:33:31 UTC

[7/7] accumulo git commit: ACCUMULO-3513 Add delegation token support for kerberos configurations

ACCUMULO-3513 Add delegation token support for kerberos configurations

Generate secret keys internally to Accumulo, distribute them among
the nodes via ZK, and use the secret keys to create expiring passwords
that users can request and servers can validate. Allows for seamless
integration with existing token support in MapReduce for HDFS and YARN
access.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2c983317
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2c983317
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2c983317

Branch: refs/heads/master
Commit: 2c983317179634d6ddc10726defff303be4ae708
Parents: 7ae2e5a
Author: Josh Elser <el...@apache.org>
Authored: Fri Feb 13 12:48:16 2015 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Feb 13 13:55:12 2015 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/Constants.java     |    5 +
 .../apache/accumulo/core/cli/ClientOpts.java    |    6 +
 .../core/cli/MapReduceClientOnDefaultTable.java |   12 +-
 .../cli/MapReduceClientOnRequiredTable.java     |   17 +-
 .../accumulo/core/cli/MapReduceClientOpts.java  |   50 +
 .../client/admin/DelegationTokenConfig.java     |   84 ++
 .../core/client/admin/SecurityOperations.java   |    8 +
 .../core/client/impl/ClientContext.java         |   19 +-
 .../impl/DelegationTokenConfigSerializer.java   |   54 +
 .../client/impl/SecurityOperationsImpl.java     |   35 +
 .../core/client/impl/ThriftTransportKey.java    |    4 +-
 .../core/client/mapred/AbstractInputFormat.java |   51 +-
 .../client/mapred/AccumuloOutputFormat.java     |    4 +-
 .../client/mapreduce/AbstractInputFormat.java   |   53 +-
 .../client/mapreduce/AccumuloOutputFormat.java  |    4 +-
 .../mapreduce/impl/DelegationTokenStub.java     |   80 ++
 .../mapreduce/lib/impl/ConfiguratorBase.java    |   81 +-
 .../mapreduce/lib/impl/InputConfigurator.java   |   68 +
 .../client/mock/MockSecurityOperations.java     |    7 +
 .../client/security/tokens/DelegationToken.java |  163 +++
 .../org/apache/accumulo/core/conf/Property.java |    4 +
 .../core/master/thrift/MasterClientService.java | 1183 ++++++++++++++++++
 .../rpc/SaslClientDigestCallbackHandler.java    |  114 ++
 .../accumulo/core/rpc/SaslConnectionParams.java |  148 ++-
 .../core/rpc/SaslDigestCallbackHandler.java     |   77 ++
 .../apache/accumulo/core/rpc/ThriftUtil.java    |   12 +-
 .../security/AuthenticationTokenIdentifier.java |  210 ++++
 .../core/security/SystemPermission.java         |    3 +-
 .../security/thrift/TAuthenticationKey.java     |  705 +++++++++++
 .../thrift/TAuthenticationTokenIdentifier.java  |  796 ++++++++++++
 .../core/security/thrift/TDelegationToken.java  |  520 ++++++++
 .../security/thrift/TDelegationTokenConfig.java |  399 ++++++
 .../thrift/TDelegationTokenOptions.java         |  399 ++++++
 .../accumulo/core/util/ThriftMessageUtil.java   |  109 ++
 core/src/main/thrift/master.thrift              |    3 +
 core/src/main/thrift/security.thrift            |   23 +
 .../client/admin/DelegationTokenConfigTest.java |   63 +
 .../DelegationTokenConfigSerializerTest.java    |   40 +
 .../client/impl/ThriftTransportKeyTest.java     |   97 +-
 .../security/tokens/DelegationTokenTest.java    |   72 ++
 .../SaslClientDigestCallbackHandlerTest.java    |   33 +
 .../core/rpc/SaslConnectionParamsTest.java      |  139 +-
 .../AuthenticationTokenIdentifierTest.java      |  111 ++
 .../core/util/ThriftMessageUtilTest.java        |   83 ++
 docs/src/main/asciidoc/chapters/kerberos.txt    |  110 ++
 .../accumulo/fate/zookeeper/IZooReader.java     |    4 +
 .../accumulo/fate/zookeeper/ZooReader.java      |   28 +
 .../apache/accumulo/fate/zookeeper/ZooUtil.java |   18 +
 .../java/org/apache/accumulo/proxy/Proxy.java   |   14 +-
 .../accumulo/server/AccumuloServerContext.java  |   56 +-
 .../server/master/state/MetaDataStateStore.java |    1 -
 .../server/rpc/SaslServerConnectionParams.java  |   69 +
 .../rpc/SaslServerDigestCallbackHandler.java    |  113 ++
 .../TCredentialsUpdatingInvocationHandler.java  |   18 +-
 .../accumulo/server/rpc/TServerUtils.java       |   34 +-
 .../server/rpc/UGIAssumingProcessor.java        |   55 +-
 .../security/AuditedSecurityOperation.java      |   14 +
 .../server/security/SecurityOperation.java      |    4 +
 .../server/security/SystemCredentials.java      |    4 +-
 .../security/delegation/AuthenticationKey.java  |  150 +++
 .../AuthenticationTokenKeyManager.java          |  169 +++
 .../AuthenticationTokenSecretManager.java       |  269 ++++
 .../ZooAuthenticationKeyDistributor.java        |  187 +++
 .../delegation/ZooAuthenticationKeyWatcher.java |  206 +++
 .../security/handler/KerberosAuthenticator.java |    3 +-
 .../server/AccumuloServerContextTest.java       |   25 +-
 .../rpc/SaslDigestCallbackHandlerTest.java      |  137 ++
 .../rpc/SaslServerConnectionParamsTest.java     |  101 ++
 .../delegation/AuthenticationKeyTest.java       |   95 ++
 .../AuthenticationTokenKeyManagerTest.java      |  196 +++
 .../AuthenticationTokenSecretManagerTest.java   |  393 ++++++
 .../ZooAuthenticationKeyDistributorTest.java    |  270 ++++
 .../ZooAuthenticationKeyWatcherTest.java        |  323 +++++
 .../accumulo/gc/SimpleGarbageCollector.java     |    2 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    |    7 +
 .../accumulo/gc/SimpleGarbageCollectorTest.java |    7 +
 .../CloseWriteAheadLogReferencesTest.java       |    7 +
 .../java/org/apache/accumulo/master/Master.java |   57 +-
 .../master/MasterClientServiceHandler.java      |   30 +
 .../apache/accumulo/tserver/TabletServer.java   |   26 +
 .../test/continuous/ContinuousBatchWalker.java  |   10 +-
 .../test/continuous/ContinuousIngest.java       |   82 +-
 .../test/continuous/ContinuousMoru.java         |   14 +-
 .../test/continuous/ContinuousOpts.java         |   80 ++
 .../test/continuous/ContinuousQuery.java        |   12 +-
 .../test/continuous/ContinuousScanner.java      |    8 +-
 .../test/continuous/ContinuousWalk.java         |    8 +-
 .../accumulo/harness/MiniClusterHarness.java    |    7 +-
 .../org/apache/accumulo/test/ShellServerIT.java |    2 +-
 .../accumulo/test/functional/KerberosIT.java    |  250 +++-
 90 files changed, 9587 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 0229d4e..94ada7a 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -82,6 +82,11 @@ public class Constants {
   public static final String ZRECOVERY = "/recovery";
 
   /**
+   * Base znode for storing secret keys that back delegation tokens
+   */
+  public static final String ZDELEGATION_TOKEN_KEYS = "/delegation_token_keys";
+
+  /**
    * Initial tablet directory name for the default tablet in all tables
    */
   public static final String DEFAULT_TABLET_LOCATION = "/default_tablet";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
index 216f32d..a7d98b3 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
@@ -241,6 +241,12 @@ public class ClientOpts extends Help {
         throw new AccumuloSecurityException("No principal or authentication token was provided", SecurityErrorCode.BAD_CREDENTIALS);
       }
 
+      // In MapReduce, if we create a DelegationToken, the principal is updated from the KerberosToken
+      // used to obtain the DelegationToken.
+      if (null != principal) {
+        return principal;
+      }
+
       // Try to extract the principal automatically from Kerberos
       if (token instanceof KerberosToken) {
         principal = ((KerberosToken) token).getPrincipal();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java
index 0cf081f..d39554c 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnDefaultTable.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.cli;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.hadoop.mapreduce.Job;
 
 import com.beust.jcommander.Parameter;
@@ -38,12 +39,15 @@ public class MapReduceClientOnDefaultTable extends MapReduceClientOpts {
   @Override
   public void setAccumuloConfigs(Job job) throws AccumuloSecurityException {
     super.setAccumuloConfigs(job);
-    AccumuloInputFormat.setConnectorInfo(job, getPrincipal(), getToken());
-    AccumuloInputFormat.setInputTableName(job, getTableName());
+    final String tableName = getTableName();
+    final String principal = getPrincipal();
+    final AuthenticationToken token = getToken();
+    AccumuloInputFormat.setConnectorInfo(job, principal, token);
+    AccumuloInputFormat.setInputTableName(job, tableName);
     AccumuloInputFormat.setScanAuthorizations(job, auths);
-    AccumuloOutputFormat.setConnectorInfo(job, getPrincipal(), getToken());
+    AccumuloOutputFormat.setConnectorInfo(job, principal, token);
     AccumuloOutputFormat.setCreateTables(job, true);
-    AccumuloOutputFormat.setDefaultTableName(job, getTableName());
+    AccumuloOutputFormat.setDefaultTableName(job, tableName);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java
index 7719e92..caef02d 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOnRequiredTable.java
@@ -19,11 +19,13 @@ package org.apache.accumulo.core.cli;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.hadoop.mapreduce.Job;
 
 import com.beust.jcommander.Parameter;
 
 public class MapReduceClientOnRequiredTable extends MapReduceClientOpts {
+
   @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
   private String tableName;
 
@@ -34,17 +36,20 @@ public class MapReduceClientOnRequiredTable extends MapReduceClientOpts {
   public void setAccumuloConfigs(Job job) throws AccumuloSecurityException {
     super.setAccumuloConfigs(job);
 
+    final String principal = getPrincipal(), tableName = getTableName();
+
     if (tokenFile.isEmpty()) {
-      AccumuloInputFormat.setConnectorInfo(job, getPrincipal(), getToken());
-      AccumuloOutputFormat.setConnectorInfo(job, getPrincipal(), getToken());
+      AuthenticationToken token = getToken();
+      AccumuloInputFormat.setConnectorInfo(job, principal, token);
+      AccumuloOutputFormat.setConnectorInfo(job, principal, token);
     } else {
-      AccumuloInputFormat.setConnectorInfo(job, getPrincipal(), tokenFile);
-      AccumuloOutputFormat.setConnectorInfo(job, getPrincipal(), tokenFile);
+      AccumuloInputFormat.setConnectorInfo(job, principal, tokenFile);
+      AccumuloOutputFormat.setConnectorInfo(job, principal, tokenFile);
     }
-    AccumuloInputFormat.setInputTableName(job, getTableName());
+    AccumuloInputFormat.setInputTableName(job, tableName);
     AccumuloInputFormat.setScanAuthorizations(job, auths);
     AccumuloOutputFormat.setCreateTables(job, true);
-    AccumuloOutputFormat.setDefaultTableName(job, getTableName());
+    AccumuloOutputFormat.setDefaultTableName(job, tableName);
   }
 
   public String getTableName() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java
index 4b3b7ed..2a5408b 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/MapReduceClientOpts.java
@@ -17,16 +17,66 @@
 package org.apache.accumulo.core.cli;
 
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.security.SystemPermission;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Adds some MR awareness to the ClientOpts
  */
 public class MapReduceClientOpts extends ClientOpts {
+  private static final Logger log = LoggerFactory.getLogger(MapReduceClientOpts.class);
+
   public void setAccumuloConfigs(Job job) throws AccumuloSecurityException {
     AccumuloInputFormat.setZooKeeperInstance(job, this.getClientConfiguration());
     AccumuloOutputFormat.setZooKeeperInstance(job, this.getClientConfiguration());
   }
+
+  @Override
+  public AuthenticationToken getToken() {
+    AuthenticationToken authToken = super.getToken();
+    // For MapReduce, Kerberos credentials don't make it to the Mappers and Reducers,
+    // so we need to request a delegation token and use that instead.
+    if (authToken instanceof KerberosToken) {
+      log.info("Received KerberosToken, fetching DelegationToken for MapReduce");
+      final KerberosToken krbToken = (KerberosToken) authToken;
+
+      try {
+        UserGroupInformation user = UserGroupInformation.getCurrentUser();
+        if (!user.hasKerberosCredentials()) {
+          throw new IllegalStateException("Expected current user to have Kerberos credentials");
+        }
+
+        String newPrincipal = user.getUserName();
+        log.info("Obtaining delegation token for {}", newPrincipal);
+
+        setPrincipal(newPrincipal);
+        Connector conn = getInstance().getConnector(newPrincipal, krbToken);
+
+        // Do the explicit check to see if the user has the permission to get a delegation token
+        if (!conn.securityOperations().hasSystemPermission(conn.whoami(), SystemPermission.OBTAIN_DELEGATION_TOKEN)) {
+          log.error("{} doesn't have the {} SystemPermission neccesary to obtain a delegation token. MapReduce tasks cannot automatically use the client's"
+              + " credentials on remote servers. Delegation tokens provide a means to run MapReduce without distributing the user's credentials.",
+              user.getUserName(), SystemPermission.OBTAIN_DELEGATION_TOKEN.name());
+          throw new IllegalStateException(conn.whoami() + " does not have permission to obtain a delegation token");
+        }
+
+        // Get the delegation token from Accumulo
+        return conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+      } catch (Exception e) {
+        final String msg = "Failed to acquire DelegationToken for use with MapReduce";
+        log.error(msg, e);
+        throw new RuntimeException(msg, e);
+      }
+    }
+    return authToken;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/admin/DelegationTokenConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/DelegationTokenConfig.java b/core/src/main/java/org/apache/accumulo/core/client/admin/DelegationTokenConfig.java
new file mode 100644
index 0000000..2e25c3d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/DelegationTokenConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.admin;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+
+/**
+ * Configuration options for obtaining a {@link DelegationToken}
+ *
+ * @since 1.7.0
+ */
+public class DelegationTokenConfig {
+
+  private long lifetime = 0;
+
+  /**
+   * Requests a specific lifetime for the token that is different than the default system lifetime. The lifetime must not exceed the secret key lifetime
+   * configured on the servers.
+   *
+   * @param lifetime
+   *          Token lifetime
+   * @param unit
+   *          Unit of time for the lifetime
+   * @return this
+   */
+  public DelegationTokenConfig setTokenLifetime(long lifetime, TimeUnit unit) {
+    checkArgument(0 <= lifetime, "Lifetime must be non-negative");
+    checkNotNull(unit, "TimeUnit was null");
+    this.lifetime = TimeUnit.MILLISECONDS.convert(lifetime, unit);
+    return this;
+  }
+
+  /**
+   * The current token lifetime. A value of zero corresponds to using the system configured lifetime.
+   *
+   * @param unit
+   *          The unit of time the lifetime should be returned in
+   * @return Token lifetime in requested unit of time
+   */
+  public long getTokenLifetime(TimeUnit unit) {
+    checkNotNull(unit);
+    return unit.convert(lifetime, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof DelegationTokenConfig) {
+      DelegationTokenConfig other = (DelegationTokenConfig) o;
+      return lifetime == other.lifetime;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Long.valueOf(lifetime).hashCode();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(32);
+    sb.append("DelegationTokenConfig[lifetime=").append(lifetime).append("ms]");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/admin/SecurityOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/SecurityOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/SecurityOperations.java
index efeafc0..2682f95 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/SecurityOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/SecurityOperations.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.NamespacePermission;
@@ -350,4 +351,11 @@ public interface SecurityOperations {
    */
   Set<String> listLocalUsers() throws AccumuloException, AccumuloSecurityException;
 
+  /**
+   * Obtain a {@link DelegationToken} for use when Kerberos credentials are unavailable (e.g. YARN Jobs)
+   *
+   * @return a {@link DelegationToken} for this user
+   * @since 1.7.0
+   */
+  DelegationToken getDelegationToken(DelegationTokenConfig cfg) throws AccumuloException, AccumuloSecurityException;
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
index 8470da4..7c2fb1b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
@@ -52,11 +52,11 @@ public class ClientContext {
 
   private static final Logger log = LoggerFactory.getLogger(ClientContext.class);
 
-  private final Instance inst;
+  protected final Instance inst;
   private Credentials creds;
   private ClientConfiguration clientConf;
   private final AccumuloConfiguration rpcConf;
-  private Connector conn;
+  protected Connector conn;
 
   /**
    * Instantiate a client context
@@ -122,12 +122,21 @@ public class ClientContext {
   /**
    * Retrieve SASL configuration to initiate an RPC connection to a server
    */
-  public SaslConnectionParams getClientSaslParams() {
+  public SaslConnectionParams getSaslParams() {
+    final boolean defaultVal = Boolean.parseBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getDefaultValue());
+
     // Use the clientConf if we have it
     if (null != clientConf) {
-      return SaslConnectionParams.forConfig(clientConf);
+      if (!clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), defaultVal)) {
+        return null;
+      }
+      return new SaslConnectionParams(clientConf, creds.getToken());
+    }
+    AccumuloConfiguration conf = getConfiguration();
+    if (!conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+      return null;
     }
-    return SaslConnectionParams.forConfig(getConfiguration());
+    return new SaslConnectionParams(conf, creds.getToken());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/impl/DelegationTokenConfigSerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/DelegationTokenConfigSerializer.java b/core/src/main/java/org/apache/accumulo/core/client/impl/DelegationTokenConfigSerializer.java
new file mode 100644
index 0000000..934079d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/DelegationTokenConfigSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
+import org.apache.accumulo.core.security.thrift.TDelegationTokenConfig;
+
+/**
+ * Handles serialization of {@link DelegationTokenConfig}
+ */
+public class DelegationTokenConfigSerializer {
+
+  /**
+   * Serialize the delegation token config into the thrift variant
+   *
+   * @param config
+   *          The configuration
+   */
+  public static TDelegationTokenConfig serialize(DelegationTokenConfig config) {
+    TDelegationTokenConfig tconfig = new TDelegationTokenConfig();
+    tconfig.setLifetime(config.getTokenLifetime(TimeUnit.MILLISECONDS));
+    return tconfig;
+  }
+
+  /**
+   * Deserialize the Thrift delegation token config into the non-thrift variant
+   *
+   * @param tconfig
+   *          The thrift configuration
+   */
+  public static DelegationTokenConfig deserialize(TDelegationTokenConfig tconfig) {
+    DelegationTokenConfig config = new DelegationTokenConfig();
+    if (tconfig.isSetLifetime()) {
+      config.setTokenLifetime(tconfig.getLifetime(), TimeUnit.MILLISECONDS);
+    }
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/impl/SecurityOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/SecurityOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/SecurityOperationsImpl.java
index feb1ee7..dbaa9d1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/SecurityOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/SecurityOperationsImpl.java
@@ -23,6 +23,8 @@ import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.admin.SecurityOperations;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
 import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
@@ -30,12 +32,17 @@ import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.master.thrift.MasterClientService.Client;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.NamespacePermission;
 import org.apache.accumulo.core.security.SystemPermission;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TDelegationToken;
+import org.apache.accumulo.core.security.thrift.TDelegationTokenConfig;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 
@@ -344,4 +351,32 @@ public class SecurityOperationsImpl implements SecurityOperations {
     });
   }
 
+  @Override
+  public DelegationToken getDelegationToken(DelegationTokenConfig cfg) throws AccumuloException, AccumuloSecurityException {
+    final TDelegationTokenConfig tConfig;
+    if (null != cfg) {
+      tConfig = DelegationTokenConfigSerializer.serialize(cfg);
+    } else {
+      tConfig = new TDelegationTokenConfig();
+    }
+
+    TDelegationToken thriftToken;
+    try {
+      thriftToken = MasterClient.execute(context, new ClientExecReturn<TDelegationToken,Client>() {
+        @Override
+        public TDelegationToken execute(Client client) throws Exception {
+          return client.getDelegationToken(Tracer.traceInfo(), context.rpcCreds(), tConfig);
+        }
+      });
+    } catch (TableNotFoundException e) {
+      // should never happen
+      throw new AssertionError("Received TableNotFoundException on method which should not throw that exception", e);
+    }
+
+    AuthenticationTokenIdentifier identifier = new AuthenticationTokenIdentifier(thriftToken.getIdentifier());
+
+    // Get the password out of the thrift delegation token
+    return new DelegationToken(thriftToken.getPassword(), identifier);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
index a843111..891d6e1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -39,7 +39,7 @@ public class ThriftTransportKey {
     this.server = server;
     this.timeout = timeout;
     this.sslParams = context.getClientSslParams();
-    this.saslParams = context.getClientSaslParams();
+    this.saslParams = context.getSaslParams();
     if (null != saslParams) {
       // TSasl and TSSL transport factories don't play nicely together
       if (null != sslParams) {
@@ -97,7 +97,7 @@ public class ThriftTransportKey {
     if (isSsl()) {
       prefix = "ssl:";
     } else if (isSasl()) {
-      prefix = "sasl:" + saslParams.getPrincipal() + "@";
+      prefix = saslParams.toString() + ":";
     }
     return prefix + server + " (" + Long.toString(timeout) + ")";
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index b83a024..0ce05d7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -39,20 +39,26 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.OfflineScanner;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.util.Pair;
@@ -62,6 +68,7 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -77,8 +84,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * Sets the connector information needed to communicate with Accumulo in this job.
    *
    * <p>
-   * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
-   * conversion to a string, and is not intended to be secure.
+   * <b>WARNING:</b> Some tokens, when serialized, divulge sensitive information in the configuration as a means to pass the token to MapReduce tasks. This
+   * information is BASE64 encoded to provide a charset safe conversion to a string, but this conversion is not intended to be secure. {@link PasswordToken} is
+   * one example that is insecure in this way; however {@link DelegationToken}s, acquired using a {@link KerberosToken}, is not subject to this concern.
    *
    * @param job
    *          the Hadoop job instance to be configured
@@ -89,6 +97,29 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
+    if (token instanceof KerberosToken) {
+      log.info("Received KerberosToken, attempting to fetch DelegationToken");
+      try {
+        Instance instance = getInstance(job);
+        Connector conn = instance.getConnector(principal, token);
+        token = conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+      } catch (Exception e) {
+        log.warn("Failed to automatically obtain DelegationToken, Mappers/Reducers will likely fail to communicate with Accumulo", e);
+      }
+    }
+    // DelegationTokens can be passed securely from user to task without serializing insecurely in the configuration
+    if (token instanceof DelegationToken) {
+      DelegationToken delegationToken = (DelegationToken) token;
+
+      // Convert it into a Hadoop Token
+      AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier();
+      Token<AuthenticationTokenIdentifier> hadoopToken = new Token<>(identifier.getBytes(), delegationToken.getPassword(), identifier.getKind(),
+          delegationToken.getServiceName());
+
+      // Add the Hadoop Token to the Job so it gets serialized and passed along.
+      job.getCredentials().addToken(hadoopToken.getService(), hadoopToken);
+    }
+
     InputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
 
@@ -147,7 +178,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @see #setConnectorInfo(JobConf, String, String)
    */
   protected static AuthenticationToken getAuthenticationToken(JobConf job) {
-    return InputConfigurator.getAuthenticationToken(CLASS, job);
+    AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS, job);
+    return ConfiguratorBase.unwrapAuthenticationToken(job, token);
   }
 
   /**
@@ -284,7 +316,18 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
    * @since 1.5.0
    */
   protected static void validateOptions(JobConf job) throws IOException {
-    InputConfigurator.validateOptions(CLASS, job);
+    final Instance inst = InputConfigurator.validateInstance(CLASS, job);
+    String principal = InputConfigurator.getPrincipal(CLASS, job);
+    AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS, job);
+    // In secure mode, we need to convert the DelegationTokenStub into a real DelegationToken
+    token = ConfiguratorBase.unwrapAuthenticationToken(job, token);
+    Connector conn;
+    try {
+      conn = inst.getConnector(principal, token);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    InputConfigurator.validatePermissions(CLASS, job, conn);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
index f877ec6..4e95a4a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.SecurityErrorCode;
@@ -168,7 +169,8 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
    * @see #setConnectorInfo(JobConf, String, String)
    */
   protected static AuthenticationToken getAuthenticationToken(JobConf job) {
-    return OutputConfigurator.getAuthenticationToken(CLASS, job);
+    AuthenticationToken token = OutputConfigurator.getAuthenticationToken(CLASS, job);
+    return ConfiguratorBase.unwrapAuthenticationToken(job, token);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 5c7b780..e1b35b2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -39,23 +39,30 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.OfflineScanner;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -63,6 +70,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -79,8 +87,9 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * Sets the connector information needed to communicate with Accumulo in this job.
    *
    * <p>
-   * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
-   * conversion to a string, and is not intended to be secure.
+   * <b>WARNING:</b> For {@link PasswordToken}, the serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to
+   * provide a charset safe conversion to a string, and is not intended to be secure. This is not the case for {@link KerberosToken} and the corresponding
+   * {@link DelegationToken} acquired using the KerberosToken.
    *
    * @param job
    *          the Hadoop job instance to be configured
@@ -91,6 +100,29 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * @since 1.5.0
    */
   public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
+    if (token instanceof KerberosToken) {
+      log.info("Received KerberosToken, attempting to fetch DelegationToken");
+      try {
+        Instance instance = getInstance(job);
+        Connector conn = instance.getConnector(principal, token);
+        token = conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+      } catch (Exception e) {
+        log.warn("Failed to automatically obtain DelegationToken, Mappers/Reducers will likely fail to communicate with Accumulo", e);
+      }
+    }
+    // DelegationTokens can be passed securely from user to task without serializing insecurely in the configuration
+    if (token instanceof DelegationToken) {
+      DelegationToken delegationToken = (DelegationToken) token;
+
+      // Convert it into a Hadoop Token
+      AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier();
+      Token<AuthenticationTokenIdentifier> hadoopToken = new Token<>(identifier.getBytes(), delegationToken.getPassword(), identifier.getKind(),
+          delegationToken.getServiceName());
+
+      // Add the Hadoop Token to the Job so it gets serialized and passed along.
+      job.getCredentials().addToken(hadoopToken.getService(), hadoopToken);
+    }
+
     InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
   }
 
@@ -171,7 +203,8 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * @see #setConnectorInfo(Job, String, String)
    */
   protected static AuthenticationToken getAuthenticationToken(JobContext context) {
-    return InputConfigurator.getAuthenticationToken(CLASS, context.getConfiguration());
+    AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS, context.getConfiguration());
+    return ConfiguratorBase.unwrapAuthenticationToken(context, token);
   }
 
   /**
@@ -339,7 +372,19 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
    * @since 1.5.0
    */
   protected static void validateOptions(JobContext context) throws IOException {
-    InputConfigurator.validateOptions(CLASS, context.getConfiguration());
+    final Configuration conf = context.getConfiguration();
+    final Instance inst = InputConfigurator.validateInstance(CLASS, conf);
+    String principal = InputConfigurator.getPrincipal(CLASS, conf);
+    AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS, conf);
+    // In secure mode, we need to convert the DelegationTokenStub into a real DelegationToken
+    token = ConfiguratorBase.unwrapAuthenticationToken(context, token);
+    Connector conn;
+    try {
+      conn = inst.getConnector(principal, token);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    InputConfigurator.validatePermissions(CLASS, conf, conn);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 5e0aa73..3164e4a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.SecurityErrorCode;
@@ -169,7 +170,8 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @see #setConnectorInfo(Job, String, String)
    */
   protected static AuthenticationToken getAuthenticationToken(JobContext context) {
-    return OutputConfigurator.getAuthenticationToken(CLASS, context.getConfiguration());
+    AuthenticationToken token = OutputConfigurator.getAuthenticationToken(CLASS, context.getConfiguration());
+    return ConfiguratorBase.unwrapAuthenticationToken(context, token);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/DelegationTokenStub.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/DelegationTokenStub.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/DelegationTokenStub.java
new file mode 100644
index 0000000..5ad91b5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/DelegationTokenStub.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import javax.security.auth.DestroyFailedException;
+
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+
+/**
+ * An internal stub class for passing DelegationToken information out of the Configuration back up to the appropriate implementation for mapreduce or mapred.
+ */
+public class DelegationTokenStub implements AuthenticationToken {
+
+  private String serviceName;
+
+  public DelegationTokenStub(String serviceName) {
+    checkNotNull(serviceName);
+    this.serviceName = serviceName;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void destroy() throws DestroyFailedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isDestroyed() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void init(Properties properties) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<TokenProperty> getProperties() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public AuthenticationToken clone() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
index b2b5150..3b5fa3a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
@@ -17,8 +17,11 @@
 package org.apache.accumulo.core.client.mapreduce.lib.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -28,15 +31,23 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.impl.DelegationTokenStub;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.util.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -56,7 +67,7 @@ public class ConfiguratorBase {
   }
 
   public static enum TokenSource {
-    FILE, INLINE;
+    FILE, INLINE, JOB;
 
     private String prefix;
 
@@ -138,8 +149,15 @@ public class ConfiguratorBase {
     checkArgument(token != null, "token is null");
     conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
     conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal);
-    conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN),
-        TokenSource.INLINE.prefix() + token.getClass().getName() + ":" + Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token)));
+    if (token instanceof DelegationToken) {
+      // Avoid serializing the DelegationToken secret in the configuration -- the Job will do that work for us securely
+      DelegationToken delToken = (DelegationToken) token;
+      conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), TokenSource.JOB.prefix() + token.getClass().getName() + ":"
+          + delToken.getServiceName().toString());
+    } else {
+      conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN),
+          TokenSource.INLINE.prefix() + token.getClass().getName() + ":" + Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token)));
+    }
   }
 
   /**
@@ -230,6 +248,14 @@ public class ConfiguratorBase {
     } else if (token.startsWith(TokenSource.FILE.prefix())) {
       String tokenFileName = token.substring(TokenSource.FILE.prefix().length());
       return getTokenFromFile(conf, getPrincipal(implementingClass, conf), tokenFileName);
+    } else if (token.startsWith(TokenSource.JOB.prefix())) {
+      String[] args = token.substring(TokenSource.JOB.prefix().length()).split(":", 2);
+      if (args.length == 2) {
+        String className = args[0], serviceName = args[1];
+        if (DelegationToken.class.getName().equals(className)) {
+          return new DelegationTokenStub(serviceName);
+        }
+      }
     }
 
     throw new IllegalStateException("Token was not properly serialized into the configuration");
@@ -401,4 +427,53 @@ public class ConfiguratorBase {
     return conf.getInt(enumToConfKey(GeneralOpts.VISIBILITY_CACHE_SIZE), Constants.DEFAULT_VISIBILITY_CACHE_SIZE);
   }
 
+  /**
+   * Unwraps the provided {@link AuthenticationToken} if it is an instance of {@link DelegationTokenStub}, reconstituting it from the provided {@link JobConf}.
+   *
+   * @param job
+   *          The job
+   * @param token
+   *          The authentication token
+   */
+  public static AuthenticationToken unwrapAuthenticationToken(JobConf job, AuthenticationToken token) {
+    checkNotNull(job);
+    checkNotNull(token);
+    if (token instanceof DelegationTokenStub) {
+      DelegationTokenStub delTokenStub = (DelegationTokenStub) token;
+      Token<? extends TokenIdentifier> hadoopToken = job.getCredentials().getToken(new Text(delTokenStub.getServiceName()));
+      AuthenticationTokenIdentifier identifier = new AuthenticationTokenIdentifier();
+      try {
+        identifier.readFields(new DataInputStream(new ByteArrayInputStream(hadoopToken.getIdentifier())));
+        return new DelegationToken(hadoopToken.getPassword(), identifier);
+      } catch (IOException e) {
+        throw new RuntimeException("Could not construct DelegationToken from JobConf Credentials", e);
+      }
+    }
+    return token;
+  }
+
+  /**
+   * Unwraps the provided {@link AuthenticationToken} if it is an instance of {@link DelegationTokenStub}, reconstituting it from the provided {@link JobConf}.
+   *
+   * @param job
+   *          The job
+   * @param token
+   *          The authentication token
+   */
+  public static AuthenticationToken unwrapAuthenticationToken(JobContext job, AuthenticationToken token) {
+    checkNotNull(job);
+    checkNotNull(token);
+    if (token instanceof DelegationTokenStub) {
+      DelegationTokenStub delTokenStub = (DelegationTokenStub) token;
+      Token<? extends TokenIdentifier> hadoopToken = job.getCredentials().getToken(new Text(delTokenStub.getServiceName()));
+      AuthenticationTokenIdentifier identifier = new AuthenticationTokenIdentifier();
+      try {
+        identifier.readFields(new DataInputStream(new ByteArrayInputStream(hadoopToken.getIdentifier())));
+        return new DelegationToken(hadoopToken.getPassword(), identifier);
+      } catch (IOException e) {
+        throw new RuntimeException("Could not construct DelegationToken from JobConf Credentials", e);
+      }
+    }
+    return token;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
index 5405ac0..6a64166 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
@@ -51,6 +51,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
 import org.apache.accumulo.core.client.mock.impl.MockTabletLocator;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.PartialKey;
@@ -616,10 +617,73 @@ public class InputConfigurator extends ConfiguratorBase {
     return TabletLocator.getLocator(context, new Text(tableId));
   }
 
+  /**
+   * Validates and extracts an {@link Instance} from the configuration
+   *
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @since 1.7.0
+   */
+  public static Instance validateInstance(Class<?> implementingClass, Configuration conf) throws IOException {
+    if (!isConnectorInfoSet(implementingClass, conf))
+      throw new IOException("Input info has not been set.");
+    String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
+    if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey))
+      throw new IOException("Instance info has not been set.");
+    return getInstance(implementingClass, conf);
+  }
+
+  /**
+   * Validates that the user has permissions on the requested tables
+   *
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param conn
+   *          the Connector
+   * @see 1.7.0
+   */
+  public static void validatePermissions(Class<?> implementingClass, Configuration conf, Connector conn) throws IOException {
+    Map<String,InputTableConfig> inputTableConfigs = getInputTableConfigs(implementingClass, conf);
+    try {
+      if (getInputTableConfigs(implementingClass, conf).size() == 0)
+        throw new IOException("No table set.");
+
+      for (Map.Entry<String,InputTableConfig> tableConfig : inputTableConfigs.entrySet()) {
+        if (!conn.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), tableConfig.getKey(), TablePermission.READ))
+          throw new IOException("Unable to access table");
+      }
+      for (Map.Entry<String,InputTableConfig> tableConfigEntry : inputTableConfigs.entrySet()) {
+        InputTableConfig tableConfig = tableConfigEntry.getValue();
+        if (!tableConfig.shouldUseLocalIterators()) {
+          if (tableConfig.getIterators() != null) {
+            for (IteratorSetting iter : tableConfig.getIterators()) {
+              if (!conn.tableOperations().testClassLoad(tableConfigEntry.getKey(), iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
+                throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+            }
+          }
+        }
+      }
+    } catch (AccumuloException e) {
+      throw new IOException(e);
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    } catch (TableNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
    *
+   * <p>
+   * The implementation (JobContext or JobConf which created the Configuration) needs to be used to extract the proper {@link AuthenticationToken} for
+   * {@link DelegationToken} support.
+   *
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
    * @param conf
@@ -627,7 +691,11 @@ public class InputConfigurator extends ConfiguratorBase {
    * @throws IOException
    *           if the context is improperly configured
    * @since 1.6.0
+   *
+   * @see #validateInstance(Class, Configuration)
+   * @see #validatePermissions(Class, Configuration, Connector)
    */
+  @Deprecated
   public static void validateOptions(Class<?> implementingClass, Configuration conf) throws IOException {
 
     Map<String,InputTableConfig> inputTableConfigs = getInputTableConfigs(implementingClass, conf);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java
index db88cfb..cc51a47 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockSecurityOperations.java
@@ -21,9 +21,11 @@ import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.admin.SecurityOperations;
 import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.NamespacePermission;
@@ -222,4 +224,9 @@ class MockSecurityOperations implements SecurityOperations {
     return acu.users.keySet();
   }
 
+  @Override
+  public DelegationToken getDelegationToken(DelegationTokenConfig cfg) throws AccumuloException, AccumuloSecurityException {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/client/security/tokens/DelegationToken.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/DelegationToken.java b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/DelegationToken.java
new file mode 100644
index 0000000..bc0251f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/DelegationToken.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.security.tokens;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AuthenticationToken} that wraps a "Hadoop style" delegation token created by Accumulo. The only intended scope of this implementation is when a
+ * KerberosToken cannot be used instead. The most common reason for this is within YARN jobs. The Kerberos credentials of the user are not passed over the wire
+ * to the job itself. The delegation token serves as a mechanism to obtain a shared secret with Accumulo using a {@link KerberosToken} and then run some task
+ * authenticating with that shared secret, this {@link DelegationToken}.
+ *
+ * @since 1.7.0
+ */
+public class DelegationToken extends PasswordToken {
+  private static final Logger log = LoggerFactory.getLogger(DelegationToken.class);
+
+  public static final String SERVICE_NAME = "AccumuloDelegationToken";
+
+  private AuthenticationTokenIdentifier identifier;
+
+  public DelegationToken() {
+    super();
+  }
+
+  public DelegationToken(byte[] delegationTokenPassword, AuthenticationTokenIdentifier identifier) {
+    checkNotNull(delegationTokenPassword);
+    checkNotNull(identifier);
+    setPassword(delegationTokenPassword);
+    this.identifier = identifier;
+  }
+
+  public DelegationToken(Instance instance, UserGroupInformation user, AuthenticationTokenIdentifier identifier) {
+    checkNotNull(instance);
+    checkNotNull(user);
+    checkNotNull(identifier);
+
+    Credentials creds = user.getCredentials();
+    Token<? extends TokenIdentifier> token = creds.getToken(new Text(SERVICE_NAME + "-" + instance.getInstanceID()));
+    if (null == token) {
+      throw new IllegalArgumentException("Did not find Accumulo delegation token in provided UserGroupInformation");
+    }
+    setPasswordFromToken(token, identifier);
+  }
+
+  public DelegationToken(Token<? extends TokenIdentifier> token, AuthenticationTokenIdentifier identifier) {
+    checkNotNull(token);
+    checkNotNull(identifier);
+    setPasswordFromToken(token, identifier);
+  }
+
+  private void setPasswordFromToken(Token<? extends TokenIdentifier> token, AuthenticationTokenIdentifier identifier) {
+    if (!AuthenticationTokenIdentifier.TOKEN_KIND.equals(token.getKind())) {
+      String msg = "Expected an AuthenticationTokenIdentifier but got a " + token.getKind();
+      log.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+
+    setPassword(token.getPassword());
+    this.identifier = identifier;
+  }
+
+  /**
+   * The identifier for this token, may be null.
+   */
+  public AuthenticationTokenIdentifier getIdentifier() {
+    return identifier;
+  }
+
+  /**
+   * The service name used to identify this {@link Token}
+   *
+   * @see Token#Constructor(byte[], byte[], Text, Text)
+   */
+  public Text getServiceName() {
+    checkNotNull(identifier);
+    return new Text(SERVICE_NAME + "-" + identifier.getInstanceId());
+  }
+
+  @Override
+  public void init(Properties properties) {
+    // Encourage use of UserGroupInformation as entry point
+  }
+
+  @Override
+  public Set<TokenProperty> getProperties() {
+    // Encourage use of UserGroupInformation as entry point
+    return Collections.emptySet();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    identifier.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    identifier = new AuthenticationTokenIdentifier();
+    identifier.readFields(in);
+  }
+
+  @Override
+  public DelegationToken clone() {
+    DelegationToken copy = new DelegationToken();
+    copy.setPassword(getPassword());
+    copy.identifier = new AuthenticationTokenIdentifier(identifier);
+    return copy;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode() ^ identifier.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof DelegationToken))
+      return false;
+    DelegationToken other = (DelegationToken) obj;
+    if (!Arrays.equals(getPassword(), other.getPassword())) {
+      return false;
+    }
+    return identifier.equals(other.identifier);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 68fac73..01f03cf 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -188,6 +188,10 @@ public enum Property {
       "Comma-separated list of paths to CredentialProviders"),
   GENERAL_LEGACY_METRICS("general.legacy.metrics", "false", PropertyType.BOOLEAN,
       "Use the old metric infrastructure configured by accumulo-metrics.xml, instead of Hadoop Metrics2"),
+  GENERAL_DELEGATION_TOKEN_LIFETIME("general.delegation.token.lifetime", "7d", PropertyType.TIMEDURATION,
+      "The length of time that delegation tokens and secret keys are valid"),
+  GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval", "1d", PropertyType.TIMEDURATION,
+      "The length of time between generation of new secret keys"),
 
   // properties that are specific to master server behavior
   MASTER_PREFIX("master.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the master server"),