You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/07/17 03:18:13 UTC

[1/2] hive git commit: HIVE-16973 : Fetching of Delegation tokens (Kerberos) for AccumuloStorageHandler fails in HS2 (Josh Elser via Sushanth Sowmyan)

Repository: hive
Updated Branches:
  refs/heads/master 93dd75d01 -> 173d98160


http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
index 5fdab28..1dd2b8c 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
@@ -109,14 +109,17 @@ public class TestHiveAccumuloTableOutputFormat {
   @Test
   public void testBasicConfiguration() throws IOException, AccumuloSecurityException {
     HiveAccumuloTableOutputFormat outputFormat = Mockito.mock(HiveAccumuloTableOutputFormat.class);
+    HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+
+    Mockito.when(outputFormat.getHelper()).thenReturn(helper);
 
     Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf);
     Mockito.doCallRealMethod().when(outputFormat).getConnectionParams(conf);
 
     outputFormat.configureAccumuloOutputFormat(conf);
 
-    Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, new PasswordToken(password));
-    Mockito.verify(outputFormat).setZooKeeperInstanceWithErrorChecking(conf, instanceName, zookeepers, false);
+    Mockito.verify(helper).setOutputFormatConnectorInfo(conf, user, new PasswordToken(password));
+    Mockito.verify(helper).setOutputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
     Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable);
   }
 
@@ -160,38 +163,32 @@ public class TestHiveAccumuloTableOutputFormat {
     Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
 
     // Stub OutputFormat actions
-    Mockito.when(outputFormat.hasKerberosCredentials(user1)).thenReturn(true);
+    Mockito.when(helper.hasKerberosCredentials(user1)).thenReturn(true);
 
     // Invoke the method
     outputFormat.configureAccumuloOutputFormat(conf);
 
-    // The AccumuloInputFormat methods
-    Mockito.verify(outputFormat).setZooKeeperInstanceWithErrorChecking(conf, instanceName, zookeepers, true);
-    Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, authToken);
+    // The AccumuloOutputFormat methods
+    Mockito.verify(helper).setOutputFormatZooKeeperInstance(conf, instanceName, zookeepers, true);
+    Mockito.verify(helper).updateOutputFormatConfWithAccumuloToken(conf, user1, cnxnParams);
     Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable);
-
-    // Other methods we expect
-    Mockito.verify(helper).mergeTokenIntoJobConf(conf, hadoopToken);
-
-    // Make sure the token made it into the UGI
-    Collection<Token<? extends TokenIdentifier>> tokens = user1.getTokens();
-    Assert.assertEquals(1, tokens.size());
-    Assert.assertEquals(hadoopToken, tokens.iterator().next());
   }
 
   @Test
   public void testMockInstance() throws IOException, AccumuloSecurityException {
     HiveAccumuloTableOutputFormat outputFormat = Mockito.mock(HiveAccumuloTableOutputFormat.class);
+    HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
     conf.setBoolean(AccumuloConnectionParameters.USE_MOCK_INSTANCE, true);
     conf.unset(AccumuloConnectionParameters.ZOOKEEPERS);
 
+    Mockito.when(outputFormat.getHelper()).thenReturn(helper);
     Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf);
     Mockito.doCallRealMethod().when(outputFormat).getConnectionParams(conf);
 
     outputFormat.configureAccumuloOutputFormat(conf);
 
-    Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, new PasswordToken(password));
-    Mockito.verify(outputFormat).setMockInstanceWithErrorChecking(conf, instanceName);
+    Mockito.verify(helper).setOutputFormatConnectorInfo(conf, user, new PasswordToken(password));
+    Mockito.verify(helper).setOutputFormatMockInstance(conf, instanceName);
     Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index ba9d7b9..bf600c2 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -399,6 +399,12 @@
       <version>${hamcrest.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.codehaus.plexus</groupId>
+      <artifactId>plexus-utils</artifactId>
+      <version>${plexus.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/itests/qtest-accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-accumulo/pom.xml b/itests/qtest-accumulo/pom.xml
index b7ce283..40d0a74 100644
--- a/itests/qtest-accumulo/pom.xml
+++ b/itests/qtest-accumulo/pom.xml
@@ -38,7 +38,8 @@
     <!-- Profile activation clause for accumulo-tests will flip skip.accumulo.tests to false
          as long as -DskipAccumuloTests is not specified -->
     <skip.accumulo.tests>true</skip.accumulo.tests>
-    <accumulo-thrift.version>0.9.0</accumulo-thrift.version>
+    <!-- Must correspond with the Accumulo version specified in the pom -->
+    <accumulo-thrift.version>0.9.1</accumulo-thrift.version>
     <test.dfs.mkdir>-mkdir -p</test.dfs.mkdir>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b6b7b5b..99ba218 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,7 @@
     <maven.build-helper.plugin.version>1.8</maven.build-helper.plugin.version>
 
     <!-- Library Dependency Versions -->
-    <accumulo.version>1.6.0</accumulo.version>
+    <accumulo.version>1.7.3</accumulo.version>
     <activemq.version>5.5.0</activemq.version>
     <ant.version>1.9.1</ant.version>
     <antlr.version>3.5.2</antlr.version>
@@ -185,6 +185,7 @@
     <netty.version>4.0.29.Final</netty.version>
     <parquet.version>1.9.0</parquet.version>
     <pig.version>0.16.0</pig.version>
+    <plexus.version>1.5.6</plexus.version>
     <protobuf.version>2.5.0</protobuf.version>
     <stax.version>1.0.1</stax.version>
     <slf4j.version>1.7.10</slf4j.version>


[2/2] hive git commit: HIVE-16973 : Fetching of Delegation tokens (Kerberos) for AccumuloStorageHandler fails in HS2 (Josh Elser via Sushanth Sowmyan)

Posted by kh...@apache.org.
HIVE-16973 : Fetching of Delegation tokens (Kerberos) for AccumuloStorageHandler fails in HS2 (Josh Elser via Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/173d9816
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/173d9816
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/173d9816

Branch: refs/heads/master
Commit: 173d9816086e4c43677ae6dc3b1bf85203f894a1
Parents: 93dd75d
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Sun Jul 16 14:42:15 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Sun Jul 16 20:18:09 2017 -0700

----------------------------------------------------------------------
 .../accumulo/AccumuloConnectionParameters.java  |  57 +--
 .../hive/accumulo/AccumuloStorageHandler.java   |  77 ++--
 .../hive/accumulo/HiveAccumuloHelper.java       | 396 +++++++++++--------
 .../org/apache/hadoop/hive/accumulo/Utils.java  |   3 +-
 .../mr/HiveAccumuloTableInputFormat.java        | 244 +++---------
 .../mr/HiveAccumuloTableOutputFormat.java       |  84 +---
 .../serde/CompositeAccumuloRowIdFactory.java    |   3 +-
 .../serde/DefaultAccumuloRowIdFactory.java      |   3 +-
 .../accumulo/TestAccumuloStorageHandler.java    |   6 +-
 .../hive/accumulo/TestHiveAccumuloHelper.java   | 184 +++++++--
 .../mr/TestHiveAccumuloTableInputFormat.java    |  37 +-
 .../mr/TestHiveAccumuloTableOutputFormat.java   |  29 +-
 itests/hive-unit/pom.xml                        |   6 +
 itests/qtest-accumulo/pom.xml                   |   3 +-
 pom.xml                                         |   3 +-
 15 files changed, 530 insertions(+), 605 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
index f34e820..58fd89f 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
@@ -17,19 +17,19 @@
 package org.apache.hadoop.hive.accumulo;
 
 import java.io.File;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
+import java.io.IOException;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -37,8 +37,6 @@ import com.google.common.base.Preconditions;
  *
  */
 public class AccumuloConnectionParameters {
-  private static final String KERBEROS_TOKEN_CLASS = "org.apache.accumulo.core.client.security.tokens.KerberosToken";
-
   public static final String USER_NAME = "accumulo.user.name";
   public static final String USER_PASS = "accumulo.user.pass";
   public static final String ZOOKEEPERS = "accumulo.zookeepers";
@@ -124,8 +122,9 @@ public class AccumuloConnectionParameters {
     if (null == zookeepers) {
       throw new IllegalArgumentException("ZooKeeper quorum string must be provided in hiveconf using " + ZOOKEEPERS);
     }
+    ClientConfiguration clientConf = ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers).withSasl(useSasl());
 
-    return new ZooKeeperInstance(instanceName, zookeepers);
+    return new ZooKeeperInstance(clientConf);
   }
 
   public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
@@ -142,7 +141,7 @@ public class AccumuloConnectionParameters {
     }
 
     if (useSasl()) {
-      return inst.getConnector(username, getKerberosToken());
+      return inst.getConnector(username, getKerberosToken(username));
     } else {
       // Not using SASL/Kerberos -- use the password
       String password = getAccumuloPassword();
@@ -175,17 +174,10 @@ public class AccumuloConnectionParameters {
    * Instantiate a KerberosToken in a backwards compatible manner.
    * @param username Kerberos principal
    */
-  AuthenticationToken getKerberosToken(String username) {
-    // Get the Class
-    Class<? extends AuthenticationToken> krbTokenClz = getKerberosTokenClass();
-
+  KerberosToken getKerberosToken(String username) {
     try {
-      // Invoke the `new KerberosToken(String)` constructor
-      // Expects that the user is already logged-in
-      Constructor<? extends AuthenticationToken> constructor = krbTokenClz.getConstructor(String.class);
-      return constructor.newInstance(username);
-    } catch (NoSuchMethodException | SecurityException | InstantiationException |
-        IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
+      return new KerberosToken(username);
+    } catch (IOException e) {
       throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e);
     }
   }
@@ -195,36 +187,11 @@ public class AccumuloConnectionParameters {
    * @param username Kerberos principal
    * @param keytab Keytab on local filesystem
    */
-  AuthenticationToken getKerberosToken(String username, String keytab) {
-    Class<? extends AuthenticationToken> krbTokenClz = getKerberosTokenClass();
-
-    File keytabFile = new File(keytab);
-    if (!keytabFile.isFile() || !keytabFile.canRead()) {
-      throw new IllegalArgumentException("Keytab must be a readable file: " + keytab);
-    }
-
+  KerberosToken getKerberosToken(String username, String keytab) {
     try {
-      // Invoke the `new KerberosToken(String, File, boolean)` constructor
-      // Tries to log in as the provided user with the given keytab, overriding an already logged-in user if present
-      Constructor<? extends AuthenticationToken> constructor = krbTokenClz.getConstructor(String.class, File.class, boolean.class);
-      return constructor.newInstance(username, keytabFile, true);
-    } catch (NoSuchMethodException | SecurityException | InstantiationException |
-        IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
+      return new KerberosToken(username, new File(keytab), true);
+    } catch (IOException e) {
       throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e);
     }
   }
-
-  /**
-   * Attempt to instantiate the KerberosToken class
-   */
-  Class<? extends AuthenticationToken> getKerberosTokenClass() {
-    try {
-      // Instantiate the class
-      Class<?> clz = JavaUtils.loadClass(KERBEROS_TOKEN_CLASS);
-      // Cast it to an AuthenticationToken since Connector will need that
-      return clz.asSubclass(AuthenticationToken.class);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException("Could not load KerberosToken class. >=Accumulo 1.7.0 required", e);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
index 62524e8..5391a99 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
@@ -24,14 +24,6 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
-import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.fate.Fate;
-import org.apache.accumulo.start.Main;
-import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableInputFormat;
 import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableOutputFormat;
@@ -60,7 +52,6 @@ import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,7 +168,6 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     connectionParams = new AccumuloConnectionParameters(conf);
   }
 
-  @SuppressWarnings("deprecation")
   @Override
   public Class<? extends AbstractSerDe> getSerDeClass() {
     return AccumuloSerDe.class;
@@ -228,6 +218,37 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     }
 
     LOG.info("Computed input job properties of " + jobProperties);
+
+    Configuration conf = getConf();
+    helper.loadDependentJars(conf);
+
+    // When Kerberos is enabled, we have to add the Accumulo delegation token to the
+    // Job so that it gets passed down to the YARN/Tez task.
+    if (connectionParams.useSasl()) {
+      try {
+        // Open an accumulo connection
+        Connector conn = connectionParams.getConnector();
+
+        // Convert the Accumulo token in a Hadoop token
+        Token<? extends TokenIdentifier> accumuloToken = helper.setConnectorInfoForInputAndOutput(connectionParams, conn, conf);
+
+        // Probably don't have a JobConf here, but we can still try...
+        if (conf instanceof JobConf) {
+          // Convert the Accumulo token in a Hadoop token
+          LOG.debug("Adding Hadoop Token for Accumulo to Job's Credentials: " + accumuloToken);
+
+          // Add the Hadoop token to the JobConf
+          JobConf jobConf = (JobConf) conf;
+          jobConf.getCredentials().addToken(accumuloToken.getService(), accumuloToken);
+          LOG.info("All job tokens: " + jobConf.getCredentials().getAllTokens());
+        } else {
+          LOG.info("Don't have a JobConf, so we cannot persist Tokens. Have to do it later.");
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to obtain DelegationToken for "
+            + connectionParams.getAccumuloUserName(), e);
+      }
+    }
   }
 
   @Override
@@ -413,7 +434,6 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     // do nothing
   }
 
-  @SuppressWarnings("deprecation")
   @Override
   public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deserializer,
       ExprNodeDesc desc) {
@@ -431,15 +451,9 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     }
   }
 
-  @SuppressWarnings("deprecation")
   @Override
   public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
-    try {
-      Utils.addDependencyJars(jobConf, Tracer.class, Fate.class, Connector.class, Main.class,
-          ZooKeeper.class, AccumuloStorageHandler.class);
-    } catch (IOException e) {
-      LOG.error("Could not add necessary Accumulo dependencies to classpath", e);
-    }
+    helper.loadDependentJars(jobConf);
 
     Properties tblProperties = tableDesc.getProperties();
     AccumuloSerDeParameters serDeParams = null;
@@ -462,36 +476,17 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
     // Job so that it gets passed down to the YARN/Tez task.
     if (connectionParams.useSasl()) {
       try {
-        // Obtain a delegation token from Accumulo
+        // Open an accumulo connection
         Connector conn = connectionParams.getConnector();
-        AuthenticationToken token = helper.getDelegationToken(conn);
-
-        // Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo
-        // AuthentiationToken is serialized, not the entire token). configureJobConf may be
-        // called multiple times with the same JobConf which results in an error from Accumulo
-        // MapReduce API. Catch the error, log a debug message and just keep going
-        try {
-          InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, jobConf,
-              connectionParams.getAccumuloUserName(), token);
-        } catch (IllegalStateException e) {
-          // The implementation balks when this method is invoked multiple times
-          LOG.debug("Ignoring IllegalArgumentException about re-setting connector information");
-        }
-        try {
-          OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, jobConf,
-              connectionParams.getAccumuloUserName(), token);
-        } catch (IllegalStateException e) {
-          // The implementation balks when this method is invoked multiple times
-          LOG.debug("Ignoring IllegalArgumentException about re-setting connector information");
-        }
 
         // Convert the Accumulo token in a Hadoop token
-        Token<? extends TokenIdentifier> accumuloToken = helper.getHadoopToken(token);
+        Token<? extends TokenIdentifier> accumuloToken = helper.setConnectorInfoForInputAndOutput(connectionParams, conn, jobConf);
 
-        LOG.info("Adding Hadoop Token for Accumulo to Job's Credentials");
+        LOG.debug("Adding Hadoop Token for Accumulo to Job's Credentials");
 
         // Add the Hadoop token to the JobConf
         helper.mergeTokenIntoJobConf(jobConf, accumuloToken);
+        LOG.debug("All job tokens: " + jobConf.getCredentials().getAllTokens());
       } catch (Exception e) {
         throw new RuntimeException("Failed to obtain DelegationToken for "
             + connectionParams.getAccumuloUserName(), e);

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java
index 71b8b77..9fccb49 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java
@@ -19,20 +19,35 @@ package org.apache.hadoop.hive.accumulo;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
+import org.apache.accumulo.core.client.impl.DelegationTokenImpl;
+import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.accumulo.fate.Fate;
+import org.apache.accumulo.start.Main;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,35 +56,9 @@ import org.slf4j.LoggerFactory;
  */
 public class HiveAccumuloHelper {
   private static final Logger log = LoggerFactory.getLogger(HiveAccumuloHelper.class);
-  // Constant from Accumulo's DelegationTokenImpl
+  // Constant from Accumulo's AuthenticationTokenIdentifier
   public static final Text ACCUMULO_SERVICE = new Text("ACCUMULO_AUTH_TOKEN");
 
-  // Constants for DelegationToken reflection to continue to support 1.6
-  private static final String DELEGATION_TOKEN_CONFIG_CLASS_NAME =
-      "org.apache.accumulo.core.client.admin.DelegationTokenConfig";
-  private static final String DELEGATION_TOKEN_IMPL_CLASS_NAME =
-      "org.apache.accumulo.core.client.impl.DelegationTokenImpl";
-  private static final String GET_DELEGATION_TOKEN_METHOD_NAME = "getDelegationToken";
-  private static final String GET_IDENTIFIER_METHOD_NAME = "getIdentifier";
-  private static final String GET_PASSWORD_METHOD_NAME = "getPassword";
-  private static final String GET_SERVICE_NAME_METHOD_NAME = "getServiceName";
-
-  // Constants for ClientConfiguration and setZooKeeperInstance reflection
-  // to continue to support 1.5
-  private static final String CLIENT_CONFIGURATION_CLASS_NAME =
-      "org.apache.accumulo.core.client.ClientConfiguration";
-  private static final String LOAD_DEFAULT_METHOD_NAME = "loadDefault";
-  private static final String SET_PROPERTY_METHOD_NAME = "setProperty";
-  private static final String INSTANCE_ZOOKEEPER_HOST = "instance.zookeeper.host";
-  private static final String INSTANCE_NAME = "instance.name";
-  private static final String INSTANCE_RPC_SASL_ENABLED = "instance.rpc.sasl.enabled";
-  private static final String SET_ZOOKEEPER_INSTANCE_METHOD_NAME = "setZooKeeperInstance";
-
-  // Constants for unwrapping the DelegationTokenStub into a DelegationTokenImpl
-  private static final String CONFIGURATOR_BASE_CLASS_NAME =
-      "org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase";
-  private static final String UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME = "unwrapAuthenticationToken";
-
   /**
    * Extract the appropriate Token for Accumulo from the provided {@code user} and add it to the
    * {@link JobConf}'s credentials.
@@ -89,18 +78,11 @@ public class HiveAccumuloHelper {
 
     // Accumulo token already in Configuration, but the Token isn't in the Job credentials like the
     // AccumuloInputFormat expects
-    Token<?> accumuloToken = null;
-    Collection<Token<? extends TokenIdentifier>> tokens = user.getTokens();
-    for (Token<?> token : tokens) {
-      if (ACCUMULO_SERVICE.equals(token.getKind())) {
-        accumuloToken = token;
-        break;
-      }
-    }
+    Token<?> accumuloToken = getAccumuloToken(user);
 
     // If we didn't find the Token, we can't proceed. Log the tokens for debugging.
     if (null == accumuloToken) {
-      log.error("Could not find accumulo token in user: " + tokens);
+      log.error("Could not find accumulo token in user: " + user.getTokens());
       throw new IOException("Could not find Accumulo Token in user's tokens");
     }
 
@@ -109,6 +91,17 @@ public class HiveAccumuloHelper {
     mergeTokenIntoJobConf(jobConf, accumuloToken);
   }
 
+  public Token<?> getAccumuloToken(UserGroupInformation user) {
+    checkNotNull(user, "Provided UGI was null");
+    Collection<Token<? extends TokenIdentifier>> tokens = user.getTokens();
+    for (Token<?> token : tokens) {
+      if (ACCUMULO_SERVICE.equals(token.getKind())) {
+        return token;
+      }
+    }
+    return null;
+  }
+
   /**
    * Merge the provided <code>Token</code> into the JobConf.
    *
@@ -128,7 +121,7 @@ public class HiveAccumuloHelper {
   }
 
   /**
-   * Obtain a DelegationToken from Accumulo in a backwards compatible manner.
+   * Obtain a DelegationToken from Accumulo.
    *
    * @param conn
    *          The Accumulo connector
@@ -138,49 +131,32 @@ public class HiveAccumuloHelper {
    */
   public AuthenticationToken getDelegationToken(Connector conn) throws IOException {
     try {
-      Class<?> clz = JavaUtils.loadClass(DELEGATION_TOKEN_CONFIG_CLASS_NAME);
-      // DelegationTokenConfig delegationTokenConfig = new DelegationTokenConfig();
-      Object delegationTokenConfig = clz.newInstance();
-
-      SecurityOperations secOps = conn.securityOperations();
-
-      Method getDelegationTokenMethod = secOps.getClass().getMethod(
-          GET_DELEGATION_TOKEN_METHOD_NAME, clz);
-
-      // secOps.getDelegationToken(delegationTokenConfig)
-      return (AuthenticationToken) getDelegationTokenMethod.invoke(secOps, delegationTokenConfig);
-    } catch (Exception e) {
-      throw new IOException("Failed to obtain DelegationToken from Accumulo", e);
+      DelegationTokenConfig config = new DelegationTokenConfig();
+      return conn.securityOperations().getDelegationToken(config);
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      throw new IOException("Failed to obtain DelegationToken", e);
     }
   }
 
-  public Token<? extends TokenIdentifier> getHadoopToken(AuthenticationToken delegationToken)
+  public Token<? extends TokenIdentifier> getHadoopToken(AuthenticationToken token)
       throws IOException {
+    if (!(token instanceof DelegationTokenImpl)) {
+      throw new IOException("Expected a DelegationTokenImpl but found " +
+          (token != null ? token.getClass() : "null"));
+    }
+    DelegationTokenImpl dt = (DelegationTokenImpl) token;
     try {
-      // DelegationTokenImpl class
-      Class<?> delegationTokenClass = JavaUtils.loadClass(DELEGATION_TOKEN_IMPL_CLASS_NAME);
-      // Methods on DelegationToken
-      Method getIdentifierMethod = delegationTokenClass.getMethod(GET_IDENTIFIER_METHOD_NAME);
-      Method getPasswordMethod = delegationTokenClass.getMethod(GET_PASSWORD_METHOD_NAME);
-      Method getServiceNameMethod = delegationTokenClass.getMethod(GET_SERVICE_NAME_METHOD_NAME);
-
-      // Treat the TokenIdentifier implementation as the abstract class to avoid dependency issues
-      // AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier();
-      TokenIdentifier identifier = (TokenIdentifier) getIdentifierMethod.invoke(delegationToken);
-
-      // new Token<AuthenticationTokenIdentifier>(identifier.getBytes(),
-      //     delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName());
-      return new Token<TokenIdentifier>(identifier.getBytes(), (byte[])
-          getPasswordMethod.invoke(delegationToken), identifier.getKind(),
-          (Text) getServiceNameMethod.invoke(delegationToken));
+      AuthenticationTokenIdentifier identifier = dt.getIdentifier();
+
+      return new Token<AuthenticationTokenIdentifier>(identifier.getBytes(),
+          dt.getPassword(), identifier.getKind(), dt.getServiceName());
     } catch (Exception e) {
       throw new IOException("Failed to create Hadoop token from Accumulo DelegationToken", e);
     }
   }
 
   /**
-   * Construct a <code>ClientConfiguration</code> instance in a backwards-compatible way. Allows us
-   * to support Accumulo 1.5
+   * Construct a <code>ClientConfiguration</code> instance.
    *
    * @param zookeepers
    *          ZooKeeper hosts
@@ -189,127 +165,199 @@ public class HiveAccumuloHelper {
    * @param useSasl
    *          Is SASL enabled
    * @return A ClientConfiguration instance
-   * @throws IOException
-   *           If the instance fails to be created
    */
-  public Object getClientConfiguration(String zookeepers, String instanceName, boolean useSasl)
-      throws IOException {
-    try {
-      // Construct a new instance of ClientConfiguration
-      Class<?> clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME);
-      Method loadDefaultMethod = clientConfigClass.getMethod(LOAD_DEFAULT_METHOD_NAME);
-      Object clientConfig = loadDefaultMethod.invoke(null);
-
-      // Set instance and zookeeper hosts
-      Method setPropertyMethod = clientConfigClass.getMethod(SET_PROPERTY_METHOD_NAME,
-          String.class, Object.class);
-      setPropertyMethod.invoke(clientConfig, INSTANCE_ZOOKEEPER_HOST, zookeepers);
-      setPropertyMethod.invoke(clientConfig, INSTANCE_NAME, instanceName);
-
-      if (useSasl) {
-        // Defaults to not using SASL, set true if SASL is being used
-        setPropertyMethod.invoke(clientConfig, INSTANCE_RPC_SASL_ENABLED, true);
+  public ClientConfiguration getClientConfiguration(String zookeepers, String instanceName, boolean useSasl) {
+    return ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers).withSasl(useSasl);
+  }
+
+  public void updateInputFormatConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
+      AccumuloConnectionParameters cnxnParams) throws IOException {
+    updateConfWithAccumuloToken(jobConf, currentUser, cnxnParams, true);
+  }
+
+  public void updateOutputFormatConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
+      AccumuloConnectionParameters cnxnParams) throws IOException {
+    updateConfWithAccumuloToken(jobConf, currentUser, cnxnParams, false);
+  }
+
+  void updateConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
+      AccumuloConnectionParameters cnxnParams, boolean isInputFormat) throws IOException {
+    if (getAccumuloToken(currentUser) != null) {
+      addTokenFromUserToJobConf(currentUser, jobConf);
+    } else {
+      try {
+        Connector connector = cnxnParams.getConnector();
+        // If we have Kerberos credentials, we should obtain the delegation token
+        AuthenticationToken token = getDelegationToken(connector);
+
+        // Send the DelegationToken down to the Configuration for Accumulo to use
+        if (isInputFormat) {
+          setInputFormatConnectorInfo(jobConf, cnxnParams.getAccumuloUserName(), token);
+        } else {
+          setOutputFormatConnectorInfo(jobConf, cnxnParams.getAccumuloUserName(), token);
+        }
+
+        // Convert the Accumulo token in a Hadoop token
+        Token<? extends TokenIdentifier> accumuloToken = getHadoopToken(token);
+
+        // Add the Hadoop token to the JobConf
+        mergeTokenIntoJobConf(jobConf, accumuloToken);
+
+        // Make sure the UGI contains the token too for good measure
+        if (!currentUser.addToken(accumuloToken)) {
+          throw new IOException("Failed to add Accumulo Token to UGI");
+        }
+
+        try {
+          addTokenFromUserToJobConf(currentUser, jobConf);
+        } catch (IOException e) {
+          throw new IOException("Current user did not contain necessary delegation Tokens " + currentUser, e);
+        }
+      } catch (AccumuloException | AccumuloSecurityException e) {
+        throw new IOException("Failed to acquire Accumulo DelegationToken", e);
       }
+    }
+  }
 
-      return clientConfig;
-    } catch (Exception e) {
-      String msg = "Failed to instantiate and invoke methods on ClientConfiguration";
-      log.error(msg, e);
-      throw new IOException(msg, e);
+  public boolean hasKerberosCredentials(UserGroupInformation ugi) {
+    // Allows mocking in testing.
+    return ugi.getAuthenticationMethod() == AuthenticationMethod.KERBEROS;
+  }
+
+  /**
+   * Calls {@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)},
+   * suppressing exceptions due to setting the configuration multiple times.
+   */
+  public void setInputFormatConnectorInfo(JobConf conf, String username, AuthenticationToken token)
+      throws AccumuloSecurityException {
+    try {
+      AccumuloInputFormat.setConnectorInfo(conf, username, token);
+    } catch (IllegalStateException e) {
+      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
     }
   }
 
   /**
-   * Wrapper around <code>setZooKeeperInstance(Configuration, ClientConfiguration)</code> which only
-   * exists in 1.6.0 and newer. Support backwards compat.
-   *
-   * @param jobConf
-   *          The JobConf
-   * @param inputOrOutputFormatClass
-   *          The InputFormat or OutputFormat class
-   * @param zookeepers
-   *          ZooKeeper hosts
-   * @param instanceName
-   *          Accumulo instance name
-   * @param useSasl
-   *          Is SASL enabled
-   * @throws IOException
-   *           When invocation of the method fails
+   * Calls {@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
+   * suppressing exceptions due to setting the configuration multiple times.
    */
-  public void setZooKeeperInstance(JobConf jobConf, Class<?> inputOrOutputFormatClass, String
-      zookeepers, String instanceName, boolean useSasl) throws IOException {
+  public void setOutputFormatConnectorInfo(JobConf conf, String username, AuthenticationToken token)
+      throws AccumuloSecurityException {
     try {
-      setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers,
-          instanceName, useSasl);
-    } catch (InvocationTargetException e) {
-      Throwable cause = e.getCause();
-      if (null != cause && cause instanceof IllegalStateException) {
-        throw (IllegalStateException) cause;
-      }
-      throw new IOException("Failed to invoke setZooKeeperInstance method", e);
+      AccumuloOutputFormat.setConnectorInfo(conf, username, token);
     } catch (IllegalStateException e) {
-      // re-throw the ISE so the caller can work around the silly impl that throws this in the
-      // first place.
-      throw e;
-    } catch (Exception e) {
-      throw new IOException("Failed to invoke setZooKeeperInstance method", e);
+      // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
     }
   }
 
   /**
-   * Wrap the setZooKeeperInstance reflected-call into its own method for testing
-   *
-   * @param jobConf
-   *          The JobConf
-   * @param inputOrOutputFormatClass
-   *          The InputFormat or OutputFormat class
-   * @param zookeepers
-   *          ZooKeeper hosts
-   * @param instanceName
-   *          Accumulo instance name
-   * @param useSasl
-   *          Is SASL enabled
-   * @throws IOException
-   *           When invocation of the method fails
+   * Calls {@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)},
+   * suppressing exceptions due to setting the configuration multiple times.
    */
-  void setZooKeeperInstanceWithReflection(JobConf jobConf, Class<?> inputOrOutputFormatClass, String
-      zookeepers, String instanceName, boolean useSasl) throws IOException, ClassNotFoundException,
-      NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException {
-    Class<?> clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME);
-
-    // get the ClientConfiguration
-    Object clientConfig = getClientConfiguration(zookeepers, instanceName, useSasl);
-
-    // AccumuloOutputFormat.setZooKeeperInstance(JobConf, ClientConfiguration) or
-    // AccumuloInputFormat.setZooKeeperInstance(JobConf, ClientConfiguration)
-    Method setZooKeeperMethod = inputOrOutputFormatClass.getMethod(
-        SET_ZOOKEEPER_INSTANCE_METHOD_NAME, JobConf.class, clientConfigClass);
-    setZooKeeperMethod.invoke(null, jobConf, clientConfig);
+  public void setInputFormatZooKeeperInstance(JobConf conf, String instanceName, String zookeepers,
+      boolean isSasl) throws IOException {
+    try {
+      ClientConfiguration clientConf = getClientConfiguration(zookeepers, instanceName, isSasl);
+      AccumuloInputFormat.setZooKeeperInstance(conf, clientConf);
+    } catch (IllegalStateException ise) {
+      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+          + zookeepers, ise);
+    }
   }
-      
+
   /**
-   * Wrapper around <code>ConfiguratorBase.unwrapAuthenticationToken</code> which only exists in
-   * 1.7.0 and new. Uses reflection to not break compat.
-   *
-   * @param jobConf
-   *          JobConf object
-   * @param token
-   *          The DelegationTokenStub instance
-   * @return A DelegationTokenImpl created from the Token in the Job's credentials
-   * @throws IOException
-   *           If the token fails to be unwrapped
+   * Calls {@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)},
+   * suppressing exceptions due to setting the configuration multiple times.
    */
-  public AuthenticationToken unwrapAuthenticationToken(JobConf jobConf, AuthenticationToken token)
-      throws IOException {
+  public void setOutputFormatZooKeeperInstance(JobConf conf, String instanceName, String zookeepers,
+      boolean isSasl) throws IOException {
     try {
-      Class<?> configuratorBaseClass = JavaUtils.loadClass(CONFIGURATOR_BASE_CLASS_NAME);
-      Method unwrapAuthenticationTokenMethod = configuratorBaseClass.getMethod(
-          UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME, JobConf.class, AuthenticationToken.class);
-      // ConfiguratorBase.unwrapAuthenticationToken(conf, token);
-      return (AuthenticationToken) unwrapAuthenticationTokenMethod.invoke(null, jobConf, token);
+      ClientConfiguration clientConf = getClientConfiguration(zookeepers, instanceName, isSasl);
+      AccumuloOutputFormat.setZooKeeperInstance(conf, clientConf);
+    } catch (IllegalStateException ise) {
+      // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+          + zookeepers, ise);
+    }
+  }
+
+  /**
+   * Calls {@link AccumuloInputFormat#setMockInstance(JobConf, String)}, suppressing exceptions due
+   * to setting the configuration multiple times.
+   */
+  public void setInputFormatMockInstance(JobConf conf, String instanceName) {
+    try {
+      AccumuloInputFormat.setMockInstance(conf, instanceName);
+    } catch (IllegalStateException e) {
+      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting mock instance of " + instanceName, e);
+    }
+  }
+
+  /**
+   * Calls {@link AccumuloOutputFormat#setMockInstance(JobConf, String)}, suppressing exceptions
+   * due to setting the configuration multiple times.
+   */
+  public void setOutputFormatMockInstance(JobConf conf, String instanceName) {
+    try {
+      AccumuloOutputFormat.setMockInstance(conf, instanceName);
+    } catch (IllegalStateException e) {
+      // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
+      log.debug("Ignoring exception setting mock instance of " + instanceName, e);
+    }
+  }
+
+  /**
+   * Sets all jars requried by Accumulo input/output tasks in the configuration to be dynamically
+   * loaded when the task is executed.
+   */
+  public void loadDependentJars(Configuration conf) {
+    @SuppressWarnings("deprecation")
+    List<Class<?>> classesToLoad = new ArrayList<>(Arrays.asList(Tracer.class, Fate.class, Connector.class, Main.class, ZooKeeper.class, AccumuloStorageHandler.class));
+    try {
+      classesToLoad.add(Class.forName("org.apache.htrace.Trace"));
     } catch (Exception e) {
-      throw new IOException("Failed to unwrap AuthenticationToken", e);
+      log.warn("Failed to load class for HTrace jar, trying to continue", e);
     }
+    try {
+      Utils.addDependencyJars(conf, classesToLoad);
+    } catch (IOException e) {
+      log.error("Could not add necessary Accumulo dependencies to classpath", e);
+    }
+  }
+
+  /**
+   * Obtains an Accumulo DelegationToken and sets it in the configuration for input and output jobs.
+   * The Accumulo token is converted into a Hadoop-style token and returned to the caller.
+   *
+   * @return A Hadoop-style token which contains the Accumulo DelegationToken
+   */
+  public Token<? extends TokenIdentifier> setConnectorInfoForInputAndOutput(AccumuloConnectionParameters params, Connector conn, Configuration conf) throws Exception {
+    // Obtain a delegation token from Accumulo
+    AuthenticationToken token = getDelegationToken(conn);
+
+    // Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo
+    // AuthentiationToken is serialized, not the entire token). configureJobConf may be
+    // called multiple times with the same JobConf which results in an error from Accumulo
+    // MapReduce API. Catch the error, log a debug message and just keep going
+    try {
+      InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, conf,
+          params.getAccumuloUserName(), token);
+    } catch (IllegalStateException e) {
+      // The implementation balks when this method is invoked multiple times
+      log.debug("Ignoring IllegalArgumentException about re-setting connector information");
+    }
+    try {
+      OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, conf,
+          params.getAccumuloUserName(), token);
+    } catch (IllegalStateException e) {
+      // The implementation balks when this method is invoked multiple times
+      log.debug("Ignoring IllegalArgumentException about re-setting connector information");
+    }
+
+    return getHadoopToken(token);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
index 407ecbd..af9a6f0 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
@@ -30,6 +30,7 @@ import java.text.MessageFormat;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.jar.JarFile;
@@ -57,7 +58,7 @@ public class Utils {
   private static final Logger log = LoggerFactory.getLogger(Utils.class);
 
   // Thanks, HBase
-  public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
+  public static void addDependencyJars(Configuration conf, List<Class<?>> classes) throws IOException {
     FileSystem localFs = FileSystem.getLocal(conf);
     Set<String> jars = new HashSet<String>();
     // Add jars that are already in the tmpjars variable

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
index 083678f..af64eac 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hive.accumulo.mr;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -39,6 +37,7 @@ import org.apache.accumulo.core.client.mapred.RangeInputSplit;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
@@ -73,9 +72,8 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,22 +107,41 @@ public class HiveAccumuloTableInputFormat implements
     Path[] tablePaths = FileInputFormat.getInputPaths(context);
 
     try {
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      final Connector connector;
+      Connector connector = null;
 
       // Need to get a Connector so we look up the user's authorizations if not otherwise specified
-      if (accumuloParams.useSasl() && !ugi.hasKerberosCredentials()) {
+      if (accumuloParams.useSasl()) {
+        log.info("Current user: " + UserGroupInformation.getCurrentUser());
         // In a YARN/Tez job, don't have the Kerberos credentials anymore, use the delegation token
         AuthenticationToken token = ConfiguratorBase.getAuthenticationToken(
             AccumuloInputFormat.class, jobConf);
-        // Convert the stub from the configuration back into a normal Token
-        // More reflection to support 1.6
-        token = helper.unwrapAuthenticationToken(jobConf, token);
-        connector = instance.getConnector(accumuloParams.getAccumuloUserName(), token);
+        if (null != token && !jobConf.getCredentials().getAllTokens().isEmpty()) {
+          // Convert the stub from the configuration back into a normal Token
+          log.info("Found authentication token in Configuration: " + token);
+          log.info("Job credential tokens: " + jobConf.getCredentials().getAllTokens());
+          AuthenticationToken unwrappedToken = ConfiguratorBase.unwrapAuthenticationToken(jobConf, token);
+          log.info("Converted authentication token from Configuration into: " + unwrappedToken);
+          // It's possible that the Job doesn't have the token in its credentials. In this case, unwrapAuthenticatinoToken
+          // will return back the original token (which we know is insufficient)
+          if (unwrappedToken != token) {
+            log.info("Creating Accumulo Connector with unwrapped delegation token");
+            connector = instance.getConnector(accumuloParams.getAccumuloUserName(), unwrappedToken);
+          } else {
+            log.info("Job credentials did not contain delegation token, fetching new token");
+          }
+        }
+
+        if (connector == null) {
+          log.info("Obtaining Accumulo Connector using KerberosToken");
+          // Construct a KerberosToken -- relies on ProxyUser configuration. Will be the client making
+          // the request on top of the HS2's user. Accumulo will require proper proxy-user auth configs.
+          connector = instance.getConnector(accumuloParams.getAccumuloUserName(), new KerberosToken(accumuloParams.getAccumuloUserName()));
+        }
       } else {
         // Still in the local JVM, use the username+password or Kerberos credentials
         connector = accumuloParams.getConnector(instance);
       }
+
       final List<ColumnMapping> columnMappings = columnMapper.getColumnMappings();
       final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper);
       final Collection<Range> ranges = predicateHandler.getRanges(jobConf, columnMapper);
@@ -153,6 +170,7 @@ public class HiveAccumuloTableInputFormat implements
       HiveAccumuloSplit[] hiveSplits = new HiveAccumuloSplit[splits.length];
       for (int i = 0; i < splits.length; i++) {
         RangeInputSplit ris = (RangeInputSplit) splits[i];
+        ris.setLogLevel(Level.DEBUG);
         hiveSplits[i] = new HiveAccumuloSplit(ris, tablePaths[0]);
       }
 
@@ -172,12 +190,6 @@ public class HiveAccumuloTableInputFormat implements
   /**
    * Setup accumulo input format from conf properties. Delegates to final RecordReader from mapred
    * package.
-   *
-   * @param inputSplit
-   * @param jobConf
-   * @param reporter
-   * @return RecordReader
-   * @throws IOException
    */
   @Override
   public RecordReader<Text,AccumuloHiveRow> getRecordReader(InputSplit inputSplit,
@@ -190,6 +202,8 @@ public class HiveAccumuloTableInputFormat implements
     }
 
     try {
+      final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(
+          jobConf);
       final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper);
 
       HiveAccumuloSplit hiveSplit = (HiveAccumuloSplit) inputSplit;
@@ -213,11 +227,14 @@ public class HiveAccumuloTableInputFormat implements
 
       // ACCUMULO-3015 Like the above, RangeInputSplit should have the table name
       // but we want it to, so just re-set it if it's null.
-      if (null == getTableName(rangeSplit)) {
-        final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(
-            jobConf);
-        log.debug("Re-setting table name on InputSplit due to Accumulo bug.");
-        setTableName(rangeSplit, accumuloParams.getAccumuloTableName());
+      if (null == rangeSplit.getTableName()) {
+        rangeSplit.setTableName(accumuloParams.getAccumuloTableName());
+      }
+
+      // ACCUMULO-4670 RangeInputSplit doesn't preserve useSasl on the ClientConfiguration/ZooKeeperInstance
+      // We have to manually re-set it in the JobConf to make sure it gets picked up.
+      if (accumuloParams.useSasl()) {
+        helper.setInputFormatZooKeeperInstance(jobConf, accumuloParams.getAccumuloInstanceName(), accumuloParams.getZooKeepers(), accumuloParams.useSasl());
       }
 
       final RecordReader<Text,PeekingIterator<Map.Entry<Key,Value>>> recordReader = accumuloInputFormat
@@ -268,9 +285,6 @@ public class HiveAccumuloTableInputFormat implements
    *          Any iterators to be configured server-side
    * @param ranges
    *          Accumulo ranges on for the query
-   * @throws AccumuloSecurityException
-   * @throws AccumuloException
-   * @throws SerDeException
    */
   protected void configure(JobConf conf, Instance instance, Connector connector,
       AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper,
@@ -279,44 +293,17 @@ public class HiveAccumuloTableInputFormat implements
 
     // Handle implementation of Instance and invoke appropriate InputFormat method
     if (instance instanceof MockInstance) {
-      setMockInstance(conf, instance.getInstanceName());
+      getHelper().setInputFormatMockInstance(conf, instance.getInstanceName());
     } else {
-      setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers(),
+      getHelper().setInputFormatZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers(),
           accumuloParams.useSasl());
     }
 
     // Set the username/passwd for the Accumulo connection
     if (accumuloParams.useSasl()) {
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-      // If we have Kerberos credentials, we should obtain the delegation token
-      if (ugi.hasKerberosCredentials()) {
-        Connector conn = accumuloParams.getConnector();
-        AuthenticationToken token = helper.getDelegationToken(conn);
-
-        // Send the DelegationToken down to the Configuration for Accumulo to use
-        setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), token);
-
-        // Convert the Accumulo token in a Hadoop token
-        Token<? extends TokenIdentifier> accumuloToken = helper.getHadoopToken(token);
-
-        log.info("Adding Hadoop Token for Accumulo to Job's Credentials");
-
-        // Add the Hadoop token to the JobConf
-        helper.mergeTokenIntoJobConf(conf, accumuloToken);
-
-        if (!ugi.addToken(accumuloToken)) {
-          throw new IOException("Failed to add Accumulo Token to UGI");
-        }
-      }
-
-      try {
-        helper.addTokenFromUserToJobConf(ugi, conf);
-      } catch (IOException e) {
-        throw new IOException("Current user did not contain necessary delegation Tokens " + ugi, e);
-      }
+      getHelper().updateInputFormatConfWithAccumuloToken(conf, UserGroupInformation.getCurrentUser(), accumuloParams);
     } else {
-      setConnectorInfo(conf, accumuloParams.getAccumuloUserName(),
+      getHelper().setInputFormatConnectorInfo(conf, accumuloParams.getAccumuloUserName(),
           new PasswordToken(accumuloParams.getAccumuloPassword()));
     }
 
@@ -355,45 +342,6 @@ public class HiveAccumuloTableInputFormat implements
   // Wrap the static AccumuloInputFormat methods with methods that we can
   // verify were correctly called via Mockito
 
-  protected void setMockInstance(JobConf conf, String instanceName) {
-    try {
-      AccumuloInputFormat.setMockInstance(conf, instanceName);
-    } catch (IllegalStateException e) {
-      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
-      log.debug("Ignoring exception setting mock instance of " + instanceName, e);
-    }
-  }
-
-  @SuppressWarnings("deprecation")
-  protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts,
-      boolean isSasl) throws IOException {
-    // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which
-    // takes a ClientConfiguration class that only exists in 1.6
-    try {
-      if (isSasl) {
-        // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped
-        // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only SASL support
-        helper.setZooKeeperInstance(conf, AccumuloInputFormat.class, zkHosts, instanceName, isSasl);
-      } else {
-        AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts);
-      }
-    } catch (IllegalStateException ise) {
-      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
-      log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
-          + zkHosts, ise);
-    }
-  }
-
-  protected void setConnectorInfo(JobConf conf, String user, AuthenticationToken token)
-      throws AccumuloSecurityException {
-    try {
-      AccumuloInputFormat.setConnectorInfo(conf, user, token);
-    } catch (IllegalStateException e) {
-      // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
-      log.debug("Ignoring exception setting Accumulo Connector instance for user " + user, e);
-    }
-  }
-
   protected void setInputTableName(JobConf conf, String tableName) {
     AccumuloInputFormat.setInputTableName(conf, tableName);
   }
@@ -454,109 +402,7 @@ public class HiveAccumuloTableInputFormat implements
     return pairs;
   }
 
-  /**
-   * Reflection to work around Accumulo 1.5 and 1.6 incompatibilities. Throws an {@link IOException}
-   * for any reflection related exceptions
-   *
-   * @param split
-   *          A RangeInputSplit
-   * @return The name of the table from the split
-   * @throws IOException
-   */
-  protected String getTableName(RangeInputSplit split) throws IOException {
-    // ACCUMULO-3017 shenanigans with method names changing without deprecation
-    Method getTableName = null;
-    try {
-      getTableName = RangeInputSplit.class.getMethod("getTableName");
-    } catch (SecurityException e) {
-      log.debug("Could not get getTableName method from RangeInputSplit", e);
-    } catch (NoSuchMethodException e) {
-      log.debug("Could not get getTableName method from RangeInputSplit", e);
-    }
-
-    if (null != getTableName) {
-      try {
-        return (String) getTableName.invoke(split);
-      } catch (IllegalArgumentException e) {
-        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
-      } catch (IllegalAccessException e) {
-        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
-      } catch (InvocationTargetException e) {
-        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
-      }
-    }
-
-    Method getTable;
-    try {
-      getTable = RangeInputSplit.class.getMethod("getTable");
-    } catch (SecurityException e) {
-      throw new IOException("Could not get table name from RangeInputSplit", e);
-    } catch (NoSuchMethodException e) {
-      throw new IOException("Could not get table name from RangeInputSplit", e);
-    }
-
-    try {
-      return (String) getTable.invoke(split);
-    } catch (IllegalArgumentException e) {
-      throw new IOException("Could not get table name from RangeInputSplit", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Could not get table name from RangeInputSplit", e);
-    } catch (InvocationTargetException e) {
-      throw new IOException("Could not get table name from RangeInputSplit", e);
-    }
-  }
-
-  /**
-   * Sets the table name on a RangeInputSplit, accounting for change in method name. Any reflection
-   * related exception is wrapped in an {@link IOException}
-   *
-   * @param split
-   *          The RangeInputSplit to operate on
-   * @param tableName
-   *          The name of the table to set
-   * @throws IOException
-   */
-  protected void setTableName(RangeInputSplit split, String tableName) throws IOException {
-    // ACCUMULO-3017 shenanigans with method names changing without deprecation
-    Method setTableName = null;
-    try {
-      setTableName = RangeInputSplit.class.getMethod("setTableName", String.class);
-    } catch (SecurityException e) {
-      log.debug("Could not get getTableName method from RangeInputSplit", e);
-    } catch (NoSuchMethodException e) {
-      log.debug("Could not get getTableName method from RangeInputSplit", e);
-    }
-
-    if (null != setTableName) {
-      try {
-        setTableName.invoke(split, tableName);
-        return;
-      } catch (IllegalArgumentException e) {
-        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
-      } catch (IllegalAccessException e) {
-        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
-      } catch (InvocationTargetException e) {
-        log.debug("Could not invoke getTableName method from RangeInputSplit", e);
-      }
-    }
-
-    Method setTable;
-    try {
-      setTable = RangeInputSplit.class.getMethod("setTable", String.class);
-    } catch (SecurityException e) {
-      throw new IOException("Could not set table name from RangeInputSplit", e);
-    } catch (NoSuchMethodException e) {
-      throw new IOException("Could not set table name from RangeInputSplit", e);
-    }
-
-    try {
-      setTable.invoke(split, tableName);
-    } catch (IllegalArgumentException e) {
-      throw new IOException("Could not set table name from RangeInputSplit", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Could not set table name from RangeInputSplit", e);
-    } catch (InvocationTargetException e) {
-      throw new IOException("Could not set table name from RangeInputSplit", e);
-    }
+  HiveAccumuloHelper getHelper() {
+    return helper;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
index bfa764a..0414c35 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
@@ -20,11 +20,8 @@ package org.apache.hadoop.hive.accumulo.mr;
 
 import java.io.IOException;
 
-import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,8 +35,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Progressable;
 
 import com.google.common.base.Preconditions;
@@ -78,46 +73,19 @@ public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat {
     // Set the necessary Accumulo information
     try {
       if (cnxnParams.useMockInstance()) {
-        setMockInstanceWithErrorChecking(job, cnxnParams.getAccumuloInstanceName());
+        getHelper().setOutputFormatMockInstance(job, cnxnParams.getAccumuloInstanceName());
       } else {
         // Accumulo instance name with ZK quorum
-        setZooKeeperInstanceWithErrorChecking(job, cnxnParams.getAccumuloInstanceName(),
+        getHelper().setOutputFormatZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(),
             cnxnParams.getZooKeepers(), cnxnParams.useSasl());
       }
 
       // Extract the delegation Token from the UGI and add it to the job
       // The AccumuloOutputFormat will look for it there.
       if (cnxnParams.useSasl()) {
-        UserGroupInformation ugi = getCurrentUser();
-        if (!hasKerberosCredentials(ugi)) {
-          getHelper().addTokenFromUserToJobConf(ugi, job);
-        } else {
-          // Still in the local JVM, can use Kerberos credentials
-          try {
-            Connector connector = cnxnParams.getConnector();
-            AuthenticationToken token = getHelper().getDelegationToken(connector);
-
-            // Send the DelegationToken down to the Configuration for Accumulo to use
-            setConnectorInfoWithErrorChecking(job, cnxnParams.getAccumuloUserName(), token);
-
-            // Convert the Accumulo token in a Hadoop token
-            Token<? extends TokenIdentifier> accumuloToken = getHelper().getHadoopToken(token);
-
-            log.info("Adding Hadoop Token for Accumulo to Job's Credentials");
-
-            // Add the Hadoop token to the JobConf
-            getHelper().mergeTokenIntoJobConf(job, accumuloToken);
-
-            // Make sure the UGI contains the token too for good measure
-            if (!ugi.addToken(accumuloToken)) {
-              throw new IOException("Failed to add Accumulo Token to UGI");
-            }
-          } catch (AccumuloException | AccumuloSecurityException e) {
-            throw new IOException("Failed to acquire Accumulo DelegationToken", e);
-          }
-        }
+        getHelper().updateOutputFormatConfWithAccumuloToken(job, getCurrentUser(), cnxnParams);
       } else {
-        setConnectorInfoWithErrorChecking(job, cnxnParams.getAccumuloUserName(),
+        getHelper().setOutputFormatConnectorInfo(job, cnxnParams.getAccumuloUserName(),
             new PasswordToken(cnxnParams.getAccumuloPassword()));
       }
 
@@ -141,45 +109,6 @@ public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat {
 
   // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing
 
-  protected void setConnectorInfoWithErrorChecking(JobConf conf, String username,
-                                     AuthenticationToken token) throws AccumuloSecurityException {
-    try {
-      AccumuloIndexedOutputFormat.setConnectorInfo(conf, username, token);
-    } catch (IllegalStateException e) {
-      // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
-      log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
-    }
-  }
-
-  @SuppressWarnings("deprecation")
-  protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName,
-                                           String zookeepers, boolean isSasl) throws IOException {
-    try {
-      if (isSasl) {
-        // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped
-        // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only
-        // SASL support
-        getHelper().setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName,
-            isSasl);
-      } else {
-        AccumuloIndexedOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);
-      }
-    } catch (IllegalStateException ise) {
-      // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
-      log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
-          + zookeepers, ise);
-    }
-  }
-
-  protected void setMockInstanceWithErrorChecking(JobConf conf, String instanceName) {
-    try {
-      AccumuloIndexedOutputFormat.setMockInstance(conf, instanceName);
-    } catch (IllegalStateException e) {
-      // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
-      log.debug("Ignoring exception setting mock instance of " + instanceName, e);
-    }
-  }
-
   protected void setDefaultAccumuloTableName(JobConf conf, String tableName) {
     AccumuloIndexedOutputFormat.setDefaultTableName(conf, tableName);
   }
@@ -206,11 +135,6 @@ public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat {
     return new AccumuloConnectionParameters(conf);
   }
 
-  boolean hasKerberosCredentials(UserGroupInformation ugi) {
-    // Allows mocking in testing.
-    return ugi.hasKerberosCredentials();
-  }
-
   UserGroupInformation getCurrentUser() throws IOException {
     // Allows mocking in testing.
     return UserGroupInformation.getCurrentUser();

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
index 02d9736..67e9250 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.accumulo.serde;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.util.Collections;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -57,7 +58,7 @@ public class CompositeAccumuloRowIdFactory<T extends AccumuloCompositeRowId> ext
   public void addDependencyJars(Configuration jobConf) throws IOException {
     // Make sure the jar containing the custom CompositeRowId is included
     // in the mapreduce job's classpath (libjars)
-    Utils.addDependencyJars(jobConf, keyClass);
+    Utils.addDependencyJars(jobConf, Collections.<Class<?>> singletonList(keyClass));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
index ea04d1a..6d96d9b 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.accumulo.serde;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -61,7 +62,7 @@ public class DefaultAccumuloRowIdFactory implements AccumuloRowIdFactory {
 
   @Override
   public void addDependencyJars(Configuration conf) throws IOException {
-    Utils.addDependencyJars(conf, getClass());
+    Utils.addDependencyJars(conf, Collections.<Class<?>> singletonList(getClass()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
index 8d195ee..58bf4a6 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
 import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -43,14 +44,17 @@ import org.mockito.Mockito;
  */
 public class TestAccumuloStorageHandler {
 
-  protected AccumuloStorageHandler storageHandler;
+  private AccumuloStorageHandler storageHandler;
+  private Configuration conf;
 
   @Rule
   public TestName test = new TestName();
 
   @Before
   public void setup() {
+    conf = new Configuration();
     storageHandler = new AccumuloStorageHandler();
+    storageHandler.setConf(conf);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java
index 406768a..af561e0 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java
@@ -23,12 +23,16 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.log4j.Logger;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -90,51 +94,163 @@ public class TestHiveAccumuloHelper {
     assertEquals(service, credTokens.iterator().next().getService());
   }
 
-  @Test(expected = IllegalStateException.class)
-  public void testISEIsPropagated() throws Exception {
+  @Test
+  public void testInputFormatWithKerberosToken() throws Exception {
+    final JobConf jobConf = new JobConf();
     final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+    final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class);
+    final Token hadoopToken = Mockito.mock(Token.class);
+    final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class);
+    final Connector connector = Mockito.mock(Connector.class);
+
+    final String user = "bob";
+    final String instanceName = "accumulo";
+    final String zookeepers = "host1:2181,host2:2181,host3:2181";
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]);
 
-    final JobConf jobConf = Mockito.mock(JobConf.class);
-    final Class<?> inputOrOutputFormatClass = AccumuloInputFormat.class;
-    final String zookeepers = "localhost:2181";
-    final String instanceName = "accumulo_instance";
-    final boolean useSasl = false;
+    // Call the real methods for these
+    Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.doCallRealMethod().when(helper).updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, true);
 
-    // Call the real "public" method
-    Mockito.doCallRealMethod().when(helper).setZooKeeperInstance(jobConf, inputOrOutputFormatClass,
-        zookeepers, instanceName, useSasl);
+    // Return our mocked objects
+    Mockito.when(cnxnParams.getConnector()).thenReturn(connector);
+    Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken);
+    Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken);
 
-    // Mock the private one to throw the ISE
-    Mockito.doThrow(new IllegalStateException()).when(helper).
-        setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers,
-            instanceName, useSasl);
+    // Stub AccumuloConnectionParameters actions
+    Mockito.when(cnxnParams.useSasl()).thenReturn(true);
+    Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user);
+    Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName);
+    Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
 
-    // Should throw an IllegalStateException
-    helper.setZooKeeperInstance(jobConf, inputOrOutputFormatClass, zookeepers, instanceName,
-        useSasl);
+    // Test the InputFormat execution path
+    //
+    Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(true);
+    // Invoke the InputFormat entrypoint
+    helper.updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.verify(helper).setInputFormatConnectorInfo(jobConf, user, authToken);
+    Mockito.verify(helper).mergeTokenIntoJobConf(jobConf, hadoopToken);
+    Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf);
+
+    // Make sure the token made it into the UGI
+    Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
+    Assert.assertEquals(1, tokens.size());
+    Assert.assertEquals(hadoopToken, tokens.iterator().next());
   }
 
-  @Test(expected = IllegalStateException.class)
-  public void testISEIsPropagatedWithReflection() throws Exception {
+  @Test
+  public void testInputFormatWithoutKerberosToken() throws Exception {
+    final JobConf jobConf = new JobConf();
+    final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+    final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class);
+    final Token hadoopToken = Mockito.mock(Token.class);
+    final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class);
+    final Connector connector = Mockito.mock(Connector.class);
+
+    final String user = "bob";
+    final String instanceName = "accumulo";
+    final String zookeepers = "host1:2181,host2:2181,host3:2181";
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]);
+
+    // Call the real methods for these
+    Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.doCallRealMethod().when(helper).updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, true);
+
+    // Return our mocked objects
+    Mockito.when(cnxnParams.getConnector()).thenReturn(connector);
+    Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken);
+    Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken);
+
+    // Stub AccumuloConnectionParameters actions
+    Mockito.when(cnxnParams.useSasl()).thenReturn(true);
+    Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user);
+    Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName);
+    Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
+
+    // Verify that when we have no kerberos credentials, we pull the serialized Token
+    Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(false);
+    helper.updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf);
+  }
+
+  @Test
+  public void testOutputFormatSaslConfigurationWithoutKerberosToken() throws Exception {
+    final JobConf jobConf = new JobConf();
     final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+    final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class);
+    final Token hadoopToken = Mockito.mock(Token.class);
+    final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class);
+    final Connector connector = Mockito.mock(Connector.class);
+
+    final String user = "bob";
+    final String instanceName = "accumulo";
+    final String zookeepers = "host1:2181,host2:2181,host3:2181";
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]);
+
+    // Call the real methods for these
+    Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, false);
+
+    // Return our mocked objects
+    Mockito.when(cnxnParams.getConnector()).thenReturn(connector);
+    Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken);
+    Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken);
+
+    // Stub AccumuloConnectionParameters actions
+    Mockito.when(cnxnParams.useSasl()).thenReturn(true);
+    Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user);
+    Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName);
+    Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
+
+    // Verify that when we have no kerberos credentials, we pull the serialized Token
+    Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(false);
+    helper.updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf);
+  }
+
+  @Test
+  public void testOutputFormatSaslConfigurationWithKerberosToken() throws Exception {
+    final JobConf jobConf = new JobConf();
+    final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+    final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class);
+    final Token hadoopToken = Mockito.mock(Token.class);
+    final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class);
+    final Connector connector = Mockito.mock(Connector.class);
+
+    final String user = "bob";
+    final String instanceName = "accumulo";
+    final String zookeepers = "host1:2181,host2:2181,host3:2181";
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]);
+
+    // Call the real methods for these
+    Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, false);
+
+    // Return our mocked objects
+    Mockito.when(cnxnParams.getConnector()).thenReturn(connector);
+    Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken);
+    Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken);
 
-    final JobConf jobConf = Mockito.mock(JobConf.class);
-    final Class<?> inputOrOutputFormatClass = AccumuloInputFormat.class;
-    final String zookeepers = "localhost:2181";
-    final String instanceName = "accumulo_instance";
-    final boolean useSasl = false;
+    // Stub AccumuloConnectionParameters actions
+    Mockito.when(cnxnParams.useSasl()).thenReturn(true);
+    Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user);
+    Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName);
+    Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
 
-    // Call the real "public" method
-    Mockito.doCallRealMethod().when(helper).setZooKeeperInstance(jobConf, inputOrOutputFormatClass,
-        zookeepers, instanceName, useSasl);
+    // We have kerberos credentials
 
-    // Mock the private one to throw the IAE
-    Mockito.doThrow(new InvocationTargetException(new IllegalStateException())).when(helper).
-        setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers,
-            instanceName, useSasl);
+    Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(true);
+    // Invoke the OutputFormat entrypoint
+    helper.updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+    Mockito.verify(helper).setOutputFormatConnectorInfo(jobConf, user, authToken);
+    Mockito.verify(helper).mergeTokenIntoJobConf(jobConf, hadoopToken);
+    Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf);
 
-    // Should throw an IllegalStateException
-    helper.setZooKeeperInstance(jobConf, inputOrOutputFormatClass, zookeepers, instanceName,
-        useSasl);
+    // Make sure the token made it into the UGI
+    Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
+    Assert.assertEquals(1, tokens.size());
+    Assert.assertEquals(hadoopToken, tokens.iterator().next());
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
index ee5aecf..56beb8f 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
 import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
 import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper;
 import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
 import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
 import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
@@ -472,6 +473,10 @@ public class TestHiveAccumuloTableInputFormat {
     Set<Range> ranges = Collections.singleton(new Range());
 
     HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+    HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+
+    // Stub out a mocked Helper instance
+    Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
 
     // Call out to the real configure method
     Mockito.doCallRealMethod().when(mockInputFormat)
@@ -485,8 +490,8 @@ public class TestHiveAccumuloTableInputFormat {
         ranges);
 
     // Verify that the correct methods are invoked on AccumuloInputFormat
-    Mockito.verify(mockInputFormat).setMockInstance(conf, mockInstance.getInstanceName());
-    Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+    Mockito.verify(helper).setInputFormatMockInstance(conf, mockInstance.getInstanceName());
+    Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
     Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
     Mockito.verify(mockInputFormat).setScanAuthorizations(conf,
         con.securityOperations().getUserAuthorizations(USER));
@@ -509,10 +514,13 @@ public class TestHiveAccumuloTableInputFormat {
 
     ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class);
     HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+    HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
 
     // Stub out the ZKI mock
     Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName);
     Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers);
+    // Stub out a mocked Helper instance
+    Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
 
     // Call out to the real configure method
     Mockito.doCallRealMethod().when(mockInputFormat)
@@ -526,8 +534,8 @@ public class TestHiveAccumuloTableInputFormat {
         ranges);
 
     // Verify that the correct methods are invoked on AccumuloInputFormat
-    Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false);
-    Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+    Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
+    Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
     Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
     Mockito.verify(mockInputFormat).setScanAuthorizations(conf,
         con.securityOperations().getUserAuthorizations(USER));
@@ -551,10 +559,13 @@ public class TestHiveAccumuloTableInputFormat {
 
     ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class);
     HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+    HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
 
     // Stub out the ZKI mock
     Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName);
     Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers);
+    // Stub out a mocked Helper instance
+    Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
 
     // Call out to the real configure method
     Mockito.doCallRealMethod().when(mockInputFormat)
@@ -568,8 +579,8 @@ public class TestHiveAccumuloTableInputFormat {
         ranges);
 
     // Verify that the correct methods are invoked on AccumuloInputFormat
-    Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false);
-    Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+    Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
+    Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
     Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
     Mockito.verify(mockInputFormat).setScanAuthorizations(conf, new Authorizations("foo,bar"));
     Mockito.verify(mockInputFormat).addIterators(conf, iterators);
@@ -605,10 +616,13 @@ public class TestHiveAccumuloTableInputFormat {
 
     ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class);
     HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+    HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
 
     // Stub out the ZKI mock
     Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName);
     Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers);
+    // Stub out a mocked Helper instance
+    Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
 
     // Call out to the real configure method
     Mockito.doCallRealMethod().when(mockInputFormat)
@@ -622,8 +636,8 @@ public class TestHiveAccumuloTableInputFormat {
         ranges);
 
     // Verify that the correct methods are invoked on AccumuloInputFormat
-    Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false);
-    Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+    Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
+    Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
     Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
     Mockito.verify(mockInputFormat).setScanAuthorizations(conf,
         con.securityOperations().getUserAuthorizations(USER));
@@ -659,12 +673,15 @@ public class TestHiveAccumuloTableInputFormat {
 
     ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class);
     HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+    HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
 
     // Stub out the ZKI mock
     Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName);
     Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers);
     Mockito.when(mockInputFormat.getPairCollection(columnMapper.getColumnMappings())).thenReturn(
         cfCqPairs);
+    // Stub out a mocked Helper instance
+    Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
 
     // Call out to the real configure method
     Mockito.doCallRealMethod().when(mockInputFormat)
@@ -678,8 +695,8 @@ public class TestHiveAccumuloTableInputFormat {
         ranges);
 
     // Verify that the correct methods are invoked on AccumuloInputFormat
-    Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false);
-    Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+    Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
+    Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
     Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
     Mockito.verify(mockInputFormat).setScanAuthorizations(conf,
         con.securityOperations().getUserAuthorizations(USER));