You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/07/17 03:18:13 UTC
[1/2] hive git commit: HIVE-16973 : Fetching of Delegation tokens
(Kerberos) for AccumuloStorageHandler fails in HS2 (Josh Elser via Sushanth
Sowmyan)
Repository: hive
Updated Branches:
refs/heads/master 93dd75d01 -> 173d98160
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
index 5fdab28..1dd2b8c 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java
@@ -109,14 +109,17 @@ public class TestHiveAccumuloTableOutputFormat {
@Test
public void testBasicConfiguration() throws IOException, AccumuloSecurityException {
HiveAccumuloTableOutputFormat outputFormat = Mockito.mock(HiveAccumuloTableOutputFormat.class);
+ HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+
+ Mockito.when(outputFormat.getHelper()).thenReturn(helper);
Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf);
Mockito.doCallRealMethod().when(outputFormat).getConnectionParams(conf);
outputFormat.configureAccumuloOutputFormat(conf);
- Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, new PasswordToken(password));
- Mockito.verify(outputFormat).setZooKeeperInstanceWithErrorChecking(conf, instanceName, zookeepers, false);
+ Mockito.verify(helper).setOutputFormatConnectorInfo(conf, user, new PasswordToken(password));
+ Mockito.verify(helper).setOutputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable);
}
@@ -160,38 +163,32 @@ public class TestHiveAccumuloTableOutputFormat {
Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
// Stub OutputFormat actions
- Mockito.when(outputFormat.hasKerberosCredentials(user1)).thenReturn(true);
+ Mockito.when(helper.hasKerberosCredentials(user1)).thenReturn(true);
// Invoke the method
outputFormat.configureAccumuloOutputFormat(conf);
- // The AccumuloInputFormat methods
- Mockito.verify(outputFormat).setZooKeeperInstanceWithErrorChecking(conf, instanceName, zookeepers, true);
- Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, authToken);
+ // The AccumuloOutputFormat methods
+ Mockito.verify(helper).setOutputFormatZooKeeperInstance(conf, instanceName, zookeepers, true);
+ Mockito.verify(helper).updateOutputFormatConfWithAccumuloToken(conf, user1, cnxnParams);
Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable);
-
- // Other methods we expect
- Mockito.verify(helper).mergeTokenIntoJobConf(conf, hadoopToken);
-
- // Make sure the token made it into the UGI
- Collection<Token<? extends TokenIdentifier>> tokens = user1.getTokens();
- Assert.assertEquals(1, tokens.size());
- Assert.assertEquals(hadoopToken, tokens.iterator().next());
}
@Test
public void testMockInstance() throws IOException, AccumuloSecurityException {
HiveAccumuloTableOutputFormat outputFormat = Mockito.mock(HiveAccumuloTableOutputFormat.class);
+ HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
conf.setBoolean(AccumuloConnectionParameters.USE_MOCK_INSTANCE, true);
conf.unset(AccumuloConnectionParameters.ZOOKEEPERS);
+ Mockito.when(outputFormat.getHelper()).thenReturn(helper);
Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf);
Mockito.doCallRealMethod().when(outputFormat).getConnectionParams(conf);
outputFormat.configureAccumuloOutputFormat(conf);
- Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, new PasswordToken(password));
- Mockito.verify(outputFormat).setMockInstanceWithErrorChecking(conf, instanceName);
+ Mockito.verify(helper).setOutputFormatConnectorInfo(conf, user, new PasswordToken(password));
+ Mockito.verify(helper).setOutputFormatMockInstance(conf, instanceName);
Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index ba9d7b9..bf600c2 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -399,6 +399,12 @@
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.codehaus.plexus</groupId>
+ <artifactId>plexus-utils</artifactId>
+ <version>${plexus.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/itests/qtest-accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-accumulo/pom.xml b/itests/qtest-accumulo/pom.xml
index b7ce283..40d0a74 100644
--- a/itests/qtest-accumulo/pom.xml
+++ b/itests/qtest-accumulo/pom.xml
@@ -38,7 +38,8 @@
<!-- Profile activation clause for accumulo-tests will flip skip.accumulo.tests to false
as long as -DskipAccumuloTests is not specified -->
<skip.accumulo.tests>true</skip.accumulo.tests>
- <accumulo-thrift.version>0.9.0</accumulo-thrift.version>
+ <!-- Must correspond with the Accumulo version specified in the pom -->
+ <accumulo-thrift.version>0.9.1</accumulo-thrift.version>
<test.dfs.mkdir>-mkdir -p</test.dfs.mkdir>
</properties>
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b6b7b5b..99ba218 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,7 @@
<maven.build-helper.plugin.version>1.8</maven.build-helper.plugin.version>
<!-- Library Dependency Versions -->
- <accumulo.version>1.6.0</accumulo.version>
+ <accumulo.version>1.7.3</accumulo.version>
<activemq.version>5.5.0</activemq.version>
<ant.version>1.9.1</ant.version>
<antlr.version>3.5.2</antlr.version>
@@ -185,6 +185,7 @@
<netty.version>4.0.29.Final</netty.version>
<parquet.version>1.9.0</parquet.version>
<pig.version>0.16.0</pig.version>
+ <plexus.version>1.5.6</plexus.version>
<protobuf.version>2.5.0</protobuf.version>
<stax.version>1.0.1</stax.version>
<slf4j.version>1.7.10</slf4j.version>
[2/2] hive git commit: HIVE-16973 : Fetching of Delegation tokens
(Kerberos) for AccumuloStorageHandler fails in HS2 (Josh Elser via Sushanth
Sowmyan)
Posted by kh...@apache.org.
HIVE-16973 : Fetching of Delegation tokens (Kerberos) for AccumuloStorageHandler fails in HS2 (Josh Elser via Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/173d9816
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/173d9816
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/173d9816
Branch: refs/heads/master
Commit: 173d9816086e4c43677ae6dc3b1bf85203f894a1
Parents: 93dd75d
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Sun Jul 16 14:42:15 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Sun Jul 16 20:18:09 2017 -0700
----------------------------------------------------------------------
.../accumulo/AccumuloConnectionParameters.java | 57 +--
.../hive/accumulo/AccumuloStorageHandler.java | 77 ++--
.../hive/accumulo/HiveAccumuloHelper.java | 396 +++++++++++--------
.../org/apache/hadoop/hive/accumulo/Utils.java | 3 +-
.../mr/HiveAccumuloTableInputFormat.java | 244 +++---------
.../mr/HiveAccumuloTableOutputFormat.java | 84 +---
.../serde/CompositeAccumuloRowIdFactory.java | 3 +-
.../serde/DefaultAccumuloRowIdFactory.java | 3 +-
.../accumulo/TestAccumuloStorageHandler.java | 6 +-
.../hive/accumulo/TestHiveAccumuloHelper.java | 184 +++++++--
.../mr/TestHiveAccumuloTableInputFormat.java | 37 +-
.../mr/TestHiveAccumuloTableOutputFormat.java | 29 +-
itests/hive-unit/pom.xml | 6 +
itests/qtest-accumulo/pom.xml | 3 +-
pom.xml | 3 +-
15 files changed, 530 insertions(+), 605 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
index f34e820..58fd89f 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java
@@ -17,19 +17,19 @@
package org.apache.hadoop.hive.accumulo;
import java.io.File;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
+import java.io.IOException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
import com.google.common.base.Preconditions;
@@ -37,8 +37,6 @@ import com.google.common.base.Preconditions;
*
*/
public class AccumuloConnectionParameters {
- private static final String KERBEROS_TOKEN_CLASS = "org.apache.accumulo.core.client.security.tokens.KerberosToken";
-
public static final String USER_NAME = "accumulo.user.name";
public static final String USER_PASS = "accumulo.user.pass";
public static final String ZOOKEEPERS = "accumulo.zookeepers";
@@ -124,8 +122,9 @@ public class AccumuloConnectionParameters {
if (null == zookeepers) {
throw new IllegalArgumentException("ZooKeeper quorum string must be provided in hiveconf using " + ZOOKEEPERS);
}
+ ClientConfiguration clientConf = ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers).withSasl(useSasl());
- return new ZooKeeperInstance(instanceName, zookeepers);
+ return new ZooKeeperInstance(clientConf);
}
public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
@@ -142,7 +141,7 @@ public class AccumuloConnectionParameters {
}
if (useSasl()) {
- return inst.getConnector(username, getKerberosToken());
+ return inst.getConnector(username, getKerberosToken(username));
} else {
// Not using SASL/Kerberos -- use the password
String password = getAccumuloPassword();
@@ -175,17 +174,10 @@ public class AccumuloConnectionParameters {
* Instantiate a KerberosToken in a backwards compatible manner.
* @param username Kerberos principal
*/
- AuthenticationToken getKerberosToken(String username) {
- // Get the Class
- Class<? extends AuthenticationToken> krbTokenClz = getKerberosTokenClass();
-
+ KerberosToken getKerberosToken(String username) {
try {
- // Invoke the `new KerberosToken(String)` constructor
- // Expects that the user is already logged-in
- Constructor<? extends AuthenticationToken> constructor = krbTokenClz.getConstructor(String.class);
- return constructor.newInstance(username);
- } catch (NoSuchMethodException | SecurityException | InstantiationException |
- IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
+ return new KerberosToken(username);
+ } catch (IOException e) {
throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e);
}
}
@@ -195,36 +187,11 @@ public class AccumuloConnectionParameters {
* @param username Kerberos principal
* @param keytab Keytab on local filesystem
*/
- AuthenticationToken getKerberosToken(String username, String keytab) {
- Class<? extends AuthenticationToken> krbTokenClz = getKerberosTokenClass();
-
- File keytabFile = new File(keytab);
- if (!keytabFile.isFile() || !keytabFile.canRead()) {
- throw new IllegalArgumentException("Keytab must be a readable file: " + keytab);
- }
-
+ KerberosToken getKerberosToken(String username, String keytab) {
try {
- // Invoke the `new KerberosToken(String, File, boolean)` constructor
- // Tries to log in as the provided user with the given keytab, overriding an already logged-in user if present
- Constructor<? extends AuthenticationToken> constructor = krbTokenClz.getConstructor(String.class, File.class, boolean.class);
- return constructor.newInstance(username, keytabFile, true);
- } catch (NoSuchMethodException | SecurityException | InstantiationException |
- IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
+ return new KerberosToken(username, new File(keytab), true);
+ } catch (IOException e) {
throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e);
}
}
-
- /**
- * Attempt to instantiate the KerberosToken class
- */
- Class<? extends AuthenticationToken> getKerberosTokenClass() {
- try {
- // Instantiate the class
- Class<?> clz = JavaUtils.loadClass(KERBEROS_TOKEN_CLASS);
- // Cast it to an AuthenticationToken since Connector will need that
- return clz.asSubclass(AuthenticationToken.class);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Could not load KerberosToken class. >=Accumulo 1.7.0 required", e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
index 62524e8..5391a99 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java
@@ -24,14 +24,6 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
-import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.fate.Fate;
-import org.apache.accumulo.start.Main;
-import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableInputFormat;
import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableOutputFormat;
@@ -60,7 +52,6 @@ import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,7 +168,6 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
connectionParams = new AccumuloConnectionParameters(conf);
}
- @SuppressWarnings("deprecation")
@Override
public Class<? extends AbstractSerDe> getSerDeClass() {
return AccumuloSerDe.class;
@@ -228,6 +218,37 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
}
LOG.info("Computed input job properties of " + jobProperties);
+
+ Configuration conf = getConf();
+ helper.loadDependentJars(conf);
+
+ // When Kerberos is enabled, we have to add the Accumulo delegation token to the
+ // Job so that it gets passed down to the YARN/Tez task.
+ if (connectionParams.useSasl()) {
+ try {
+ // Open an accumulo connection
+ Connector conn = connectionParams.getConnector();
+
+ // Convert the Accumulo token in a Hadoop token
+ Token<? extends TokenIdentifier> accumuloToken = helper.setConnectorInfoForInputAndOutput(connectionParams, conn, conf);
+
+ // Probably don't have a JobConf here, but we can still try...
+ if (conf instanceof JobConf) {
+ // Convert the Accumulo token in a Hadoop token
+ LOG.debug("Adding Hadoop Token for Accumulo to Job's Credentials: " + accumuloToken);
+
+ // Add the Hadoop token to the JobConf
+ JobConf jobConf = (JobConf) conf;
+ jobConf.getCredentials().addToken(accumuloToken.getService(), accumuloToken);
+ LOG.info("All job tokens: " + jobConf.getCredentials().getAllTokens());
+ } else {
+ LOG.info("Don't have a JobConf, so we cannot persist Tokens. Have to do it later.");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to obtain DelegationToken for "
+ + connectionParams.getAccumuloUserName(), e);
+ }
+ }
}
@Override
@@ -413,7 +434,6 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
// do nothing
}
- @SuppressWarnings("deprecation")
@Override
public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deserializer,
ExprNodeDesc desc) {
@@ -431,15 +451,9 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
}
}
- @SuppressWarnings("deprecation")
@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
- try {
- Utils.addDependencyJars(jobConf, Tracer.class, Fate.class, Connector.class, Main.class,
- ZooKeeper.class, AccumuloStorageHandler.class);
- } catch (IOException e) {
- LOG.error("Could not add necessary Accumulo dependencies to classpath", e);
- }
+ helper.loadDependentJars(jobConf);
Properties tblProperties = tableDesc.getProperties();
AccumuloSerDeParameters serDeParams = null;
@@ -462,36 +476,17 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv
// Job so that it gets passed down to the YARN/Tez task.
if (connectionParams.useSasl()) {
try {
- // Obtain a delegation token from Accumulo
+ // Open an accumulo connection
Connector conn = connectionParams.getConnector();
- AuthenticationToken token = helper.getDelegationToken(conn);
-
- // Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo
- // AuthentiationToken is serialized, not the entire token). configureJobConf may be
- // called multiple times with the same JobConf which results in an error from Accumulo
- // MapReduce API. Catch the error, log a debug message and just keep going
- try {
- InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, jobConf,
- connectionParams.getAccumuloUserName(), token);
- } catch (IllegalStateException e) {
- // The implementation balks when this method is invoked multiple times
- LOG.debug("Ignoring IllegalArgumentException about re-setting connector information");
- }
- try {
- OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, jobConf,
- connectionParams.getAccumuloUserName(), token);
- } catch (IllegalStateException e) {
- // The implementation balks when this method is invoked multiple times
- LOG.debug("Ignoring IllegalArgumentException about re-setting connector information");
- }
// Convert the Accumulo token in a Hadoop token
- Token<? extends TokenIdentifier> accumuloToken = helper.getHadoopToken(token);
+ Token<? extends TokenIdentifier> accumuloToken = helper.setConnectorInfoForInputAndOutput(connectionParams, conn, jobConf);
- LOG.info("Adding Hadoop Token for Accumulo to Job's Credentials");
+ LOG.debug("Adding Hadoop Token for Accumulo to Job's Credentials");
// Add the Hadoop token to the JobConf
helper.mergeTokenIntoJobConf(jobConf, accumuloToken);
+ LOG.debug("All job tokens: " + jobConf.getCredentials().getAllTokens());
} catch (Exception e) {
throw new RuntimeException("Failed to obtain DelegationToken for "
+ connectionParams.getAccumuloUserName(), e);
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java
index 71b8b77..9fccb49 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java
@@ -19,20 +19,35 @@ package org.apache.hadoop.hive.accumulo;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
+import org.apache.accumulo.core.client.impl.DelegationTokenImpl;
+import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.accumulo.fate.Fate;
+import org.apache.accumulo.start.Main;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,35 +56,9 @@ import org.slf4j.LoggerFactory;
*/
public class HiveAccumuloHelper {
private static final Logger log = LoggerFactory.getLogger(HiveAccumuloHelper.class);
- // Constant from Accumulo's DelegationTokenImpl
+ // Constant from Accumulo's AuthenticationTokenIdentifier
public static final Text ACCUMULO_SERVICE = new Text("ACCUMULO_AUTH_TOKEN");
- // Constants for DelegationToken reflection to continue to support 1.6
- private static final String DELEGATION_TOKEN_CONFIG_CLASS_NAME =
- "org.apache.accumulo.core.client.admin.DelegationTokenConfig";
- private static final String DELEGATION_TOKEN_IMPL_CLASS_NAME =
- "org.apache.accumulo.core.client.impl.DelegationTokenImpl";
- private static final String GET_DELEGATION_TOKEN_METHOD_NAME = "getDelegationToken";
- private static final String GET_IDENTIFIER_METHOD_NAME = "getIdentifier";
- private static final String GET_PASSWORD_METHOD_NAME = "getPassword";
- private static final String GET_SERVICE_NAME_METHOD_NAME = "getServiceName";
-
- // Constants for ClientConfiguration and setZooKeeperInstance reflection
- // to continue to support 1.5
- private static final String CLIENT_CONFIGURATION_CLASS_NAME =
- "org.apache.accumulo.core.client.ClientConfiguration";
- private static final String LOAD_DEFAULT_METHOD_NAME = "loadDefault";
- private static final String SET_PROPERTY_METHOD_NAME = "setProperty";
- private static final String INSTANCE_ZOOKEEPER_HOST = "instance.zookeeper.host";
- private static final String INSTANCE_NAME = "instance.name";
- private static final String INSTANCE_RPC_SASL_ENABLED = "instance.rpc.sasl.enabled";
- private static final String SET_ZOOKEEPER_INSTANCE_METHOD_NAME = "setZooKeeperInstance";
-
- // Constants for unwrapping the DelegationTokenStub into a DelegationTokenImpl
- private static final String CONFIGURATOR_BASE_CLASS_NAME =
- "org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase";
- private static final String UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME = "unwrapAuthenticationToken";
-
/**
* Extract the appropriate Token for Accumulo from the provided {@code user} and add it to the
* {@link JobConf}'s credentials.
@@ -89,18 +78,11 @@ public class HiveAccumuloHelper {
// Accumulo token already in Configuration, but the Token isn't in the Job credentials like the
// AccumuloInputFormat expects
- Token<?> accumuloToken = null;
- Collection<Token<? extends TokenIdentifier>> tokens = user.getTokens();
- for (Token<?> token : tokens) {
- if (ACCUMULO_SERVICE.equals(token.getKind())) {
- accumuloToken = token;
- break;
- }
- }
+ Token<?> accumuloToken = getAccumuloToken(user);
// If we didn't find the Token, we can't proceed. Log the tokens for debugging.
if (null == accumuloToken) {
- log.error("Could not find accumulo token in user: " + tokens);
+ log.error("Could not find accumulo token in user: " + user.getTokens());
throw new IOException("Could not find Accumulo Token in user's tokens");
}
@@ -109,6 +91,17 @@ public class HiveAccumuloHelper {
mergeTokenIntoJobConf(jobConf, accumuloToken);
}
+ public Token<?> getAccumuloToken(UserGroupInformation user) {
+ checkNotNull(user, "Provided UGI was null");
+ Collection<Token<? extends TokenIdentifier>> tokens = user.getTokens();
+ for (Token<?> token : tokens) {
+ if (ACCUMULO_SERVICE.equals(token.getKind())) {
+ return token;
+ }
+ }
+ return null;
+ }
+
/**
* Merge the provided <code>Token</code> into the JobConf.
*
@@ -128,7 +121,7 @@ public class HiveAccumuloHelper {
}
/**
- * Obtain a DelegationToken from Accumulo in a backwards compatible manner.
+ * Obtain a DelegationToken from Accumulo.
*
* @param conn
* The Accumulo connector
@@ -138,49 +131,32 @@ public class HiveAccumuloHelper {
*/
public AuthenticationToken getDelegationToken(Connector conn) throws IOException {
try {
- Class<?> clz = JavaUtils.loadClass(DELEGATION_TOKEN_CONFIG_CLASS_NAME);
- // DelegationTokenConfig delegationTokenConfig = new DelegationTokenConfig();
- Object delegationTokenConfig = clz.newInstance();
-
- SecurityOperations secOps = conn.securityOperations();
-
- Method getDelegationTokenMethod = secOps.getClass().getMethod(
- GET_DELEGATION_TOKEN_METHOD_NAME, clz);
-
- // secOps.getDelegationToken(delegationTokenConfig)
- return (AuthenticationToken) getDelegationTokenMethod.invoke(secOps, delegationTokenConfig);
- } catch (Exception e) {
- throw new IOException("Failed to obtain DelegationToken from Accumulo", e);
+ DelegationTokenConfig config = new DelegationTokenConfig();
+ return conn.securityOperations().getDelegationToken(config);
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new IOException("Failed to obtain DelegationToken", e);
}
}
- public Token<? extends TokenIdentifier> getHadoopToken(AuthenticationToken delegationToken)
+ public Token<? extends TokenIdentifier> getHadoopToken(AuthenticationToken token)
throws IOException {
+ if (!(token instanceof DelegationTokenImpl)) {
+ throw new IOException("Expected a DelegationTokenImpl but found " +
+ (token != null ? token.getClass() : "null"));
+ }
+ DelegationTokenImpl dt = (DelegationTokenImpl) token;
try {
- // DelegationTokenImpl class
- Class<?> delegationTokenClass = JavaUtils.loadClass(DELEGATION_TOKEN_IMPL_CLASS_NAME);
- // Methods on DelegationToken
- Method getIdentifierMethod = delegationTokenClass.getMethod(GET_IDENTIFIER_METHOD_NAME);
- Method getPasswordMethod = delegationTokenClass.getMethod(GET_PASSWORD_METHOD_NAME);
- Method getServiceNameMethod = delegationTokenClass.getMethod(GET_SERVICE_NAME_METHOD_NAME);
-
- // Treat the TokenIdentifier implementation as the abstract class to avoid dependency issues
- // AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier();
- TokenIdentifier identifier = (TokenIdentifier) getIdentifierMethod.invoke(delegationToken);
-
- // new Token<AuthenticationTokenIdentifier>(identifier.getBytes(),
- // delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName());
- return new Token<TokenIdentifier>(identifier.getBytes(), (byte[])
- getPasswordMethod.invoke(delegationToken), identifier.getKind(),
- (Text) getServiceNameMethod.invoke(delegationToken));
+ AuthenticationTokenIdentifier identifier = dt.getIdentifier();
+
+ return new Token<AuthenticationTokenIdentifier>(identifier.getBytes(),
+ dt.getPassword(), identifier.getKind(), dt.getServiceName());
} catch (Exception e) {
throw new IOException("Failed to create Hadoop token from Accumulo DelegationToken", e);
}
}
/**
- * Construct a <code>ClientConfiguration</code> instance in a backwards-compatible way. Allows us
- * to support Accumulo 1.5
+ * Construct a <code>ClientConfiguration</code> instance.
*
* @param zookeepers
* ZooKeeper hosts
@@ -189,127 +165,199 @@ public class HiveAccumuloHelper {
* @param useSasl
* Is SASL enabled
* @return A ClientConfiguration instance
- * @throws IOException
- * If the instance fails to be created
*/
- public Object getClientConfiguration(String zookeepers, String instanceName, boolean useSasl)
- throws IOException {
- try {
- // Construct a new instance of ClientConfiguration
- Class<?> clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME);
- Method loadDefaultMethod = clientConfigClass.getMethod(LOAD_DEFAULT_METHOD_NAME);
- Object clientConfig = loadDefaultMethod.invoke(null);
-
- // Set instance and zookeeper hosts
- Method setPropertyMethod = clientConfigClass.getMethod(SET_PROPERTY_METHOD_NAME,
- String.class, Object.class);
- setPropertyMethod.invoke(clientConfig, INSTANCE_ZOOKEEPER_HOST, zookeepers);
- setPropertyMethod.invoke(clientConfig, INSTANCE_NAME, instanceName);
-
- if (useSasl) {
- // Defaults to not using SASL, set true if SASL is being used
- setPropertyMethod.invoke(clientConfig, INSTANCE_RPC_SASL_ENABLED, true);
+ public ClientConfiguration getClientConfiguration(String zookeepers, String instanceName, boolean useSasl) {
+ return ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers).withSasl(useSasl);
+ }
+
+ public void updateInputFormatConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
+ AccumuloConnectionParameters cnxnParams) throws IOException {
+ updateConfWithAccumuloToken(jobConf, currentUser, cnxnParams, true);
+ }
+
+ public void updateOutputFormatConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
+ AccumuloConnectionParameters cnxnParams) throws IOException {
+ updateConfWithAccumuloToken(jobConf, currentUser, cnxnParams, false);
+ }
+
+ void updateConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser,
+ AccumuloConnectionParameters cnxnParams, boolean isInputFormat) throws IOException {
+ if (getAccumuloToken(currentUser) != null) {
+ addTokenFromUserToJobConf(currentUser, jobConf);
+ } else {
+ try {
+ Connector connector = cnxnParams.getConnector();
+ // If we have Kerberos credentials, we should obtain the delegation token
+ AuthenticationToken token = getDelegationToken(connector);
+
+ // Send the DelegationToken down to the Configuration for Accumulo to use
+ if (isInputFormat) {
+ setInputFormatConnectorInfo(jobConf, cnxnParams.getAccumuloUserName(), token);
+ } else {
+ setOutputFormatConnectorInfo(jobConf, cnxnParams.getAccumuloUserName(), token);
+ }
+
+ // Convert the Accumulo token in a Hadoop token
+ Token<? extends TokenIdentifier> accumuloToken = getHadoopToken(token);
+
+ // Add the Hadoop token to the JobConf
+ mergeTokenIntoJobConf(jobConf, accumuloToken);
+
+ // Make sure the UGI contains the token too for good measure
+ if (!currentUser.addToken(accumuloToken)) {
+ throw new IOException("Failed to add Accumulo Token to UGI");
+ }
+
+ try {
+ addTokenFromUserToJobConf(currentUser, jobConf);
+ } catch (IOException e) {
+ throw new IOException("Current user did not contain necessary delegation Tokens " + currentUser, e);
+ }
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new IOException("Failed to acquire Accumulo DelegationToken", e);
}
+ }
+ }
- return clientConfig;
- } catch (Exception e) {
- String msg = "Failed to instantiate and invoke methods on ClientConfiguration";
- log.error(msg, e);
- throw new IOException(msg, e);
+ public boolean hasKerberosCredentials(UserGroupInformation ugi) {
+ // Allows mocking in testing.
+ return ugi.getAuthenticationMethod() == AuthenticationMethod.KERBEROS;
+ }
+
+ /**
+ * Calls {@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)},
+ * suppressing exceptions due to setting the configuration multiple times.
+ */
+ public void setInputFormatConnectorInfo(JobConf conf, String username, AuthenticationToken token)
+ throws AccumuloSecurityException {
+ try {
+ AccumuloInputFormat.setConnectorInfo(conf, username, token);
+ } catch (IllegalStateException e) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
}
}
/**
- * Wrapper around <code>setZooKeeperInstance(Configuration, ClientConfiguration)</code> which only
- * exists in 1.6.0 and newer. Support backwards compat.
- *
- * @param jobConf
- * The JobConf
- * @param inputOrOutputFormatClass
- * The InputFormat or OutputFormat class
- * @param zookeepers
- * ZooKeeper hosts
- * @param instanceName
- * Accumulo instance name
- * @param useSasl
- * Is SASL enabled
- * @throws IOException
- * When invocation of the method fails
+ * Calls {@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
+ * suppressing exceptions due to setting the configuration multiple times.
*/
- public void setZooKeeperInstance(JobConf jobConf, Class<?> inputOrOutputFormatClass, String
- zookeepers, String instanceName, boolean useSasl) throws IOException {
+ public void setOutputFormatConnectorInfo(JobConf conf, String username, AuthenticationToken token)
+ throws AccumuloSecurityException {
try {
- setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers,
- instanceName, useSasl);
- } catch (InvocationTargetException e) {
- Throwable cause = e.getCause();
- if (null != cause && cause instanceof IllegalStateException) {
- throw (IllegalStateException) cause;
- }
- throw new IOException("Failed to invoke setZooKeeperInstance method", e);
+ AccumuloOutputFormat.setConnectorInfo(conf, username, token);
} catch (IllegalStateException e) {
- // re-throw the ISE so the caller can work around the silly impl that throws this in the
- // first place.
- throw e;
- } catch (Exception e) {
- throw new IOException("Failed to invoke setZooKeeperInstance method", e);
+ // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
}
}
/**
- * Wrap the setZooKeeperInstance reflected-call into its own method for testing
- *
- * @param jobConf
- * The JobConf
- * @param inputOrOutputFormatClass
- * The InputFormat or OutputFormat class
- * @param zookeepers
- * ZooKeeper hosts
- * @param instanceName
- * Accumulo instance name
- * @param useSasl
- * Is SASL enabled
- * @throws IOException
- * When invocation of the method fails
+ * Calls {@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)},
+ * suppressing exceptions due to setting the configuration multiple times.
*/
- void setZooKeeperInstanceWithReflection(JobConf jobConf, Class<?> inputOrOutputFormatClass, String
- zookeepers, String instanceName, boolean useSasl) throws IOException, ClassNotFoundException,
- NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException,
- InvocationTargetException {
- Class<?> clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME);
-
- // get the ClientConfiguration
- Object clientConfig = getClientConfiguration(zookeepers, instanceName, useSasl);
-
- // AccumuloOutputFormat.setZooKeeperInstance(JobConf, ClientConfiguration) or
- // AccumuloInputFormat.setZooKeeperInstance(JobConf, ClientConfiguration)
- Method setZooKeeperMethod = inputOrOutputFormatClass.getMethod(
- SET_ZOOKEEPER_INSTANCE_METHOD_NAME, JobConf.class, clientConfigClass);
- setZooKeeperMethod.invoke(null, jobConf, clientConfig);
+ public void setInputFormatZooKeeperInstance(JobConf conf, String instanceName, String zookeepers,
+ boolean isSasl) throws IOException {
+ try {
+ ClientConfiguration clientConf = getClientConfiguration(zookeepers, instanceName, isSasl);
+ AccumuloInputFormat.setZooKeeperInstance(conf, clientConf);
+ } catch (IllegalStateException ise) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+ + zookeepers, ise);
+ }
}
-
+
/**
- * Wrapper around <code>ConfiguratorBase.unwrapAuthenticationToken</code> which only exists in
- * 1.7.0 and new. Uses reflection to not break compat.
- *
- * @param jobConf
- * JobConf object
- * @param token
- * The DelegationTokenStub instance
- * @return A DelegationTokenImpl created from the Token in the Job's credentials
- * @throws IOException
- * If the token fails to be unwrapped
+ * Calls {@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)},
+ * suppressing exceptions due to setting the configuration multiple times.
*/
- public AuthenticationToken unwrapAuthenticationToken(JobConf jobConf, AuthenticationToken token)
- throws IOException {
+ public void setOutputFormatZooKeeperInstance(JobConf conf, String instanceName, String zookeepers,
+ boolean isSasl) throws IOException {
try {
- Class<?> configuratorBaseClass = JavaUtils.loadClass(CONFIGURATOR_BASE_CLASS_NAME);
- Method unwrapAuthenticationTokenMethod = configuratorBaseClass.getMethod(
- UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME, JobConf.class, AuthenticationToken.class);
- // ConfiguratorBase.unwrapAuthenticationToken(conf, token);
- return (AuthenticationToken) unwrapAuthenticationTokenMethod.invoke(null, jobConf, token);
+ ClientConfiguration clientConf = getClientConfiguration(zookeepers, instanceName, isSasl);
+ AccumuloOutputFormat.setZooKeeperInstance(conf, clientConf);
+ } catch (IllegalStateException ise) {
+ // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+ + zookeepers, ise);
+ }
+ }
+
+ /**
+ * Calls {@link AccumuloInputFormat#setMockInstance(JobConf, String)}, suppressing exceptions due
+ * to setting the configuration multiple times.
+ */
+ public void setInputFormatMockInstance(JobConf conf, String instanceName) {
+ try {
+ AccumuloInputFormat.setMockInstance(conf, instanceName);
+ } catch (IllegalStateException e) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting mock instance of " + instanceName, e);
+ }
+ }
+
+ /**
+ * Calls {@link AccumuloOutputFormat#setMockInstance(JobConf, String)}, suppressing exceptions
+ * due to setting the configuration multiple times.
+ */
+ public void setOutputFormatMockInstance(JobConf conf, String instanceName) {
+ try {
+ AccumuloOutputFormat.setMockInstance(conf, instanceName);
+ } catch (IllegalStateException e) {
+ // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting mock instance of " + instanceName, e);
+ }
+ }
+
+ /**
+ * Sets all jars requried by Accumulo input/output tasks in the configuration to be dynamically
+ * loaded when the task is executed.
+ */
+ public void loadDependentJars(Configuration conf) {
+ @SuppressWarnings("deprecation")
+ List<Class<?>> classesToLoad = new ArrayList<>(Arrays.asList(Tracer.class, Fate.class, Connector.class, Main.class, ZooKeeper.class, AccumuloStorageHandler.class));
+ try {
+ classesToLoad.add(Class.forName("org.apache.htrace.Trace"));
} catch (Exception e) {
- throw new IOException("Failed to unwrap AuthenticationToken", e);
+ log.warn("Failed to load class for HTrace jar, trying to continue", e);
}
+ try {
+ Utils.addDependencyJars(conf, classesToLoad);
+ } catch (IOException e) {
+ log.error("Could not add necessary Accumulo dependencies to classpath", e);
+ }
+ }
+
+ /**
+ * Obtains an Accumulo DelegationToken and sets it in the configuration for input and output jobs.
+ * The Accumulo token is converted into a Hadoop-style token and returned to the caller.
+ *
+ * @return A Hadoop-style token which contains the Accumulo DelegationToken
+ */
+ public Token<? extends TokenIdentifier> setConnectorInfoForInputAndOutput(AccumuloConnectionParameters params, Connector conn, Configuration conf) throws Exception {
+ // Obtain a delegation token from Accumulo
+ AuthenticationToken token = getDelegationToken(conn);
+
+ // Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo
+ // AuthentiationToken is serialized, not the entire token). configureJobConf may be
+ // called multiple times with the same JobConf which results in an error from Accumulo
+ // MapReduce API. Catch the error, log a debug message and just keep going
+ try {
+ InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, conf,
+ params.getAccumuloUserName(), token);
+ } catch (IllegalStateException e) {
+ // The implementation balks when this method is invoked multiple times
+ log.debug("Ignoring IllegalArgumentException about re-setting connector information");
+ }
+ try {
+ OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, conf,
+ params.getAccumuloUserName(), token);
+ } catch (IllegalStateException e) {
+ // The implementation balks when this method is invoked multiple times
+ log.debug("Ignoring IllegalArgumentException about re-setting connector information");
+ }
+
+ return getHadoopToken(token);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
index 407ecbd..af9a6f0 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java
@@ -30,6 +30,7 @@ import java.text.MessageFormat;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.jar.JarFile;
@@ -57,7 +58,7 @@ public class Utils {
private static final Logger log = LoggerFactory.getLogger(Utils.class);
// Thanks, HBase
- public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
+ public static void addDependencyJars(Configuration conf, List<Class<?>> classes) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
Set<String> jars = new HashSet<String>();
// Add jars that are already in the tmpjars variable
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
index 083678f..af64eac 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.accumulo.mr;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -39,6 +37,7 @@ import org.apache.accumulo.core.client.mapred.RangeInputSplit;
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
@@ -73,9 +72,8 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,22 +107,41 @@ public class HiveAccumuloTableInputFormat implements
Path[] tablePaths = FileInputFormat.getInputPaths(context);
try {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- final Connector connector;
+ Connector connector = null;
// Need to get a Connector so we look up the user's authorizations if not otherwise specified
- if (accumuloParams.useSasl() && !ugi.hasKerberosCredentials()) {
+ if (accumuloParams.useSasl()) {
+ log.info("Current user: " + UserGroupInformation.getCurrentUser());
// In a YARN/Tez job, don't have the Kerberos credentials anymore, use the delegation token
AuthenticationToken token = ConfiguratorBase.getAuthenticationToken(
AccumuloInputFormat.class, jobConf);
- // Convert the stub from the configuration back into a normal Token
- // More reflection to support 1.6
- token = helper.unwrapAuthenticationToken(jobConf, token);
- connector = instance.getConnector(accumuloParams.getAccumuloUserName(), token);
+ if (null != token && !jobConf.getCredentials().getAllTokens().isEmpty()) {
+ // Convert the stub from the configuration back into a normal Token
+ log.info("Found authentication token in Configuration: " + token);
+ log.info("Job credential tokens: " + jobConf.getCredentials().getAllTokens());
+ AuthenticationToken unwrappedToken = ConfiguratorBase.unwrapAuthenticationToken(jobConf, token);
+ log.info("Converted authentication token from Configuration into: " + unwrappedToken);
+ // It's possible that the Job doesn't have the token in its credentials. In this case, unwrapAuthenticatinoToken
+ // will return back the original token (which we know is insufficient)
+ if (unwrappedToken != token) {
+ log.info("Creating Accumulo Connector with unwrapped delegation token");
+ connector = instance.getConnector(accumuloParams.getAccumuloUserName(), unwrappedToken);
+ } else {
+ log.info("Job credentials did not contain delegation token, fetching new token");
+ }
+ }
+
+ if (connector == null) {
+ log.info("Obtaining Accumulo Connector using KerberosToken");
+ // Construct a KerberosToken -- relies on ProxyUser configuration. Will be the client making
+ // the request on top of the HS2's user. Accumulo will require proper proxy-user auth configs.
+ connector = instance.getConnector(accumuloParams.getAccumuloUserName(), new KerberosToken(accumuloParams.getAccumuloUserName()));
+ }
} else {
// Still in the local JVM, use the username+password or Kerberos credentials
connector = accumuloParams.getConnector(instance);
}
+
final List<ColumnMapping> columnMappings = columnMapper.getColumnMappings();
final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper);
final Collection<Range> ranges = predicateHandler.getRanges(jobConf, columnMapper);
@@ -153,6 +170,7 @@ public class HiveAccumuloTableInputFormat implements
HiveAccumuloSplit[] hiveSplits = new HiveAccumuloSplit[splits.length];
for (int i = 0; i < splits.length; i++) {
RangeInputSplit ris = (RangeInputSplit) splits[i];
+ ris.setLogLevel(Level.DEBUG);
hiveSplits[i] = new HiveAccumuloSplit(ris, tablePaths[0]);
}
@@ -172,12 +190,6 @@ public class HiveAccumuloTableInputFormat implements
/**
* Setup accumulo input format from conf properties. Delegates to final RecordReader from mapred
* package.
- *
- * @param inputSplit
- * @param jobConf
- * @param reporter
- * @return RecordReader
- * @throws IOException
*/
@Override
public RecordReader<Text,AccumuloHiveRow> getRecordReader(InputSplit inputSplit,
@@ -190,6 +202,8 @@ public class HiveAccumuloTableInputFormat implements
}
try {
+ final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(
+ jobConf);
final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper);
HiveAccumuloSplit hiveSplit = (HiveAccumuloSplit) inputSplit;
@@ -213,11 +227,14 @@ public class HiveAccumuloTableInputFormat implements
// ACCUMULO-3015 Like the above, RangeInputSplit should have the table name
// but we want it to, so just re-set it if it's null.
- if (null == getTableName(rangeSplit)) {
- final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(
- jobConf);
- log.debug("Re-setting table name on InputSplit due to Accumulo bug.");
- setTableName(rangeSplit, accumuloParams.getAccumuloTableName());
+ if (null == rangeSplit.getTableName()) {
+ rangeSplit.setTableName(accumuloParams.getAccumuloTableName());
+ }
+
+ // ACCUMULO-4670 RangeInputSplit doesn't preserve useSasl on the ClientConfiguration/ZooKeeperInstance
+ // We have to manually re-set it in the JobConf to make sure it gets picked up.
+ if (accumuloParams.useSasl()) {
+ helper.setInputFormatZooKeeperInstance(jobConf, accumuloParams.getAccumuloInstanceName(), accumuloParams.getZooKeepers(), accumuloParams.useSasl());
}
final RecordReader<Text,PeekingIterator<Map.Entry<Key,Value>>> recordReader = accumuloInputFormat
@@ -268,9 +285,6 @@ public class HiveAccumuloTableInputFormat implements
* Any iterators to be configured server-side
* @param ranges
* Accumulo ranges on for the query
- * @throws AccumuloSecurityException
- * @throws AccumuloException
- * @throws SerDeException
*/
protected void configure(JobConf conf, Instance instance, Connector connector,
AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper,
@@ -279,44 +293,17 @@ public class HiveAccumuloTableInputFormat implements
// Handle implementation of Instance and invoke appropriate InputFormat method
if (instance instanceof MockInstance) {
- setMockInstance(conf, instance.getInstanceName());
+ getHelper().setInputFormatMockInstance(conf, instance.getInstanceName());
} else {
- setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers(),
+ getHelper().setInputFormatZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers(),
accumuloParams.useSasl());
}
// Set the username/passwd for the Accumulo connection
if (accumuloParams.useSasl()) {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
- // If we have Kerberos credentials, we should obtain the delegation token
- if (ugi.hasKerberosCredentials()) {
- Connector conn = accumuloParams.getConnector();
- AuthenticationToken token = helper.getDelegationToken(conn);
-
- // Send the DelegationToken down to the Configuration for Accumulo to use
- setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), token);
-
- // Convert the Accumulo token in a Hadoop token
- Token<? extends TokenIdentifier> accumuloToken = helper.getHadoopToken(token);
-
- log.info("Adding Hadoop Token for Accumulo to Job's Credentials");
-
- // Add the Hadoop token to the JobConf
- helper.mergeTokenIntoJobConf(conf, accumuloToken);
-
- if (!ugi.addToken(accumuloToken)) {
- throw new IOException("Failed to add Accumulo Token to UGI");
- }
- }
-
- try {
- helper.addTokenFromUserToJobConf(ugi, conf);
- } catch (IOException e) {
- throw new IOException("Current user did not contain necessary delegation Tokens " + ugi, e);
- }
+ getHelper().updateInputFormatConfWithAccumuloToken(conf, UserGroupInformation.getCurrentUser(), accumuloParams);
} else {
- setConnectorInfo(conf, accumuloParams.getAccumuloUserName(),
+ getHelper().setInputFormatConnectorInfo(conf, accumuloParams.getAccumuloUserName(),
new PasswordToken(accumuloParams.getAccumuloPassword()));
}
@@ -355,45 +342,6 @@ public class HiveAccumuloTableInputFormat implements
// Wrap the static AccumuloInputFormat methods with methods that we can
// verify were correctly called via Mockito
- protected void setMockInstance(JobConf conf, String instanceName) {
- try {
- AccumuloInputFormat.setMockInstance(conf, instanceName);
- } catch (IllegalStateException e) {
- // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
- log.debug("Ignoring exception setting mock instance of " + instanceName, e);
- }
- }
-
- @SuppressWarnings("deprecation")
- protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts,
- boolean isSasl) throws IOException {
- // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which
- // takes a ClientConfiguration class that only exists in 1.6
- try {
- if (isSasl) {
- // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped
- // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only SASL support
- helper.setZooKeeperInstance(conf, AccumuloInputFormat.class, zkHosts, instanceName, isSasl);
- } else {
- AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts);
- }
- } catch (IllegalStateException ise) {
- // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
- log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
- + zkHosts, ise);
- }
- }
-
- protected void setConnectorInfo(JobConf conf, String user, AuthenticationToken token)
- throws AccumuloSecurityException {
- try {
- AccumuloInputFormat.setConnectorInfo(conf, user, token);
- } catch (IllegalStateException e) {
- // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
- log.debug("Ignoring exception setting Accumulo Connector instance for user " + user, e);
- }
- }
-
protected void setInputTableName(JobConf conf, String tableName) {
AccumuloInputFormat.setInputTableName(conf, tableName);
}
@@ -454,109 +402,7 @@ public class HiveAccumuloTableInputFormat implements
return pairs;
}
- /**
- * Reflection to work around Accumulo 1.5 and 1.6 incompatibilities. Throws an {@link IOException}
- * for any reflection related exceptions
- *
- * @param split
- * A RangeInputSplit
- * @return The name of the table from the split
- * @throws IOException
- */
- protected String getTableName(RangeInputSplit split) throws IOException {
- // ACCUMULO-3017 shenanigans with method names changing without deprecation
- Method getTableName = null;
- try {
- getTableName = RangeInputSplit.class.getMethod("getTableName");
- } catch (SecurityException e) {
- log.debug("Could not get getTableName method from RangeInputSplit", e);
- } catch (NoSuchMethodException e) {
- log.debug("Could not get getTableName method from RangeInputSplit", e);
- }
-
- if (null != getTableName) {
- try {
- return (String) getTableName.invoke(split);
- } catch (IllegalArgumentException e) {
- log.debug("Could not invoke getTableName method from RangeInputSplit", e);
- } catch (IllegalAccessException e) {
- log.debug("Could not invoke getTableName method from RangeInputSplit", e);
- } catch (InvocationTargetException e) {
- log.debug("Could not invoke getTableName method from RangeInputSplit", e);
- }
- }
-
- Method getTable;
- try {
- getTable = RangeInputSplit.class.getMethod("getTable");
- } catch (SecurityException e) {
- throw new IOException("Could not get table name from RangeInputSplit", e);
- } catch (NoSuchMethodException e) {
- throw new IOException("Could not get table name from RangeInputSplit", e);
- }
-
- try {
- return (String) getTable.invoke(split);
- } catch (IllegalArgumentException e) {
- throw new IOException("Could not get table name from RangeInputSplit", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Could not get table name from RangeInputSplit", e);
- } catch (InvocationTargetException e) {
- throw new IOException("Could not get table name from RangeInputSplit", e);
- }
- }
-
- /**
- * Sets the table name on a RangeInputSplit, accounting for change in method name. Any reflection
- * related exception is wrapped in an {@link IOException}
- *
- * @param split
- * The RangeInputSplit to operate on
- * @param tableName
- * The name of the table to set
- * @throws IOException
- */
- protected void setTableName(RangeInputSplit split, String tableName) throws IOException {
- // ACCUMULO-3017 shenanigans with method names changing without deprecation
- Method setTableName = null;
- try {
- setTableName = RangeInputSplit.class.getMethod("setTableName", String.class);
- } catch (SecurityException e) {
- log.debug("Could not get getTableName method from RangeInputSplit", e);
- } catch (NoSuchMethodException e) {
- log.debug("Could not get getTableName method from RangeInputSplit", e);
- }
-
- if (null != setTableName) {
- try {
- setTableName.invoke(split, tableName);
- return;
- } catch (IllegalArgumentException e) {
- log.debug("Could not invoke getTableName method from RangeInputSplit", e);
- } catch (IllegalAccessException e) {
- log.debug("Could not invoke getTableName method from RangeInputSplit", e);
- } catch (InvocationTargetException e) {
- log.debug("Could not invoke getTableName method from RangeInputSplit", e);
- }
- }
-
- Method setTable;
- try {
- setTable = RangeInputSplit.class.getMethod("setTable", String.class);
- } catch (SecurityException e) {
- throw new IOException("Could not set table name from RangeInputSplit", e);
- } catch (NoSuchMethodException e) {
- throw new IOException("Could not set table name from RangeInputSplit", e);
- }
-
- try {
- setTable.invoke(split, tableName);
- } catch (IllegalArgumentException e) {
- throw new IOException("Could not set table name from RangeInputSplit", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Could not set table name from RangeInputSplit", e);
- } catch (InvocationTargetException e) {
- throw new IOException("Could not set table name from RangeInputSplit", e);
- }
+ HiveAccumuloHelper getHelper() {
+ return helper;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
index bfa764a..0414c35 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
@@ -20,11 +20,8 @@ package org.apache.hadoop.hive.accumulo.mr;
import java.io.IOException;
-import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.fs.FileSystem;
@@ -38,8 +35,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import com.google.common.base.Preconditions;
@@ -78,46 +73,19 @@ public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat {
// Set the necessary Accumulo information
try {
if (cnxnParams.useMockInstance()) {
- setMockInstanceWithErrorChecking(job, cnxnParams.getAccumuloInstanceName());
+ getHelper().setOutputFormatMockInstance(job, cnxnParams.getAccumuloInstanceName());
} else {
// Accumulo instance name with ZK quorum
- setZooKeeperInstanceWithErrorChecking(job, cnxnParams.getAccumuloInstanceName(),
+ getHelper().setOutputFormatZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(),
cnxnParams.getZooKeepers(), cnxnParams.useSasl());
}
// Extract the delegation Token from the UGI and add it to the job
// The AccumuloOutputFormat will look for it there.
if (cnxnParams.useSasl()) {
- UserGroupInformation ugi = getCurrentUser();
- if (!hasKerberosCredentials(ugi)) {
- getHelper().addTokenFromUserToJobConf(ugi, job);
- } else {
- // Still in the local JVM, can use Kerberos credentials
- try {
- Connector connector = cnxnParams.getConnector();
- AuthenticationToken token = getHelper().getDelegationToken(connector);
-
- // Send the DelegationToken down to the Configuration for Accumulo to use
- setConnectorInfoWithErrorChecking(job, cnxnParams.getAccumuloUserName(), token);
-
- // Convert the Accumulo token in a Hadoop token
- Token<? extends TokenIdentifier> accumuloToken = getHelper().getHadoopToken(token);
-
- log.info("Adding Hadoop Token for Accumulo to Job's Credentials");
-
- // Add the Hadoop token to the JobConf
- getHelper().mergeTokenIntoJobConf(job, accumuloToken);
-
- // Make sure the UGI contains the token too for good measure
- if (!ugi.addToken(accumuloToken)) {
- throw new IOException("Failed to add Accumulo Token to UGI");
- }
- } catch (AccumuloException | AccumuloSecurityException e) {
- throw new IOException("Failed to acquire Accumulo DelegationToken", e);
- }
- }
+ getHelper().updateOutputFormatConfWithAccumuloToken(job, getCurrentUser(), cnxnParams);
} else {
- setConnectorInfoWithErrorChecking(job, cnxnParams.getAccumuloUserName(),
+ getHelper().setOutputFormatConnectorInfo(job, cnxnParams.getAccumuloUserName(),
new PasswordToken(cnxnParams.getAccumuloPassword()));
}
@@ -141,45 +109,6 @@ public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat {
// Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing
- protected void setConnectorInfoWithErrorChecking(JobConf conf, String username,
- AuthenticationToken token) throws AccumuloSecurityException {
- try {
- AccumuloIndexedOutputFormat.setConnectorInfo(conf, username, token);
- } catch (IllegalStateException e) {
- // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
- log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e);
- }
- }
-
- @SuppressWarnings("deprecation")
- protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName,
- String zookeepers, boolean isSasl) throws IOException {
- try {
- if (isSasl) {
- // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped
- // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only
- // SASL support
- getHelper().setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName,
- isSasl);
- } else {
- AccumuloIndexedOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);
- }
- } catch (IllegalStateException ise) {
- // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
- log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
- + zookeepers, ise);
- }
- }
-
- protected void setMockInstanceWithErrorChecking(JobConf conf, String instanceName) {
- try {
- AccumuloIndexedOutputFormat.setMockInstance(conf, instanceName);
- } catch (IllegalStateException e) {
- // AccumuloOutputFormat complains if you re-set an already set value. We just don't care.
- log.debug("Ignoring exception setting mock instance of " + instanceName, e);
- }
- }
-
protected void setDefaultAccumuloTableName(JobConf conf, String tableName) {
AccumuloIndexedOutputFormat.setDefaultTableName(conf, tableName);
}
@@ -206,11 +135,6 @@ public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat {
return new AccumuloConnectionParameters(conf);
}
- boolean hasKerberosCredentials(UserGroupInformation ugi) {
- // Allows mocking in testing.
- return ugi.hasKerberosCredentials();
- }
-
UserGroupInformation getCurrentUser() throws IOException {
// Allows mocking in testing.
return UserGroupInformation.getCurrentUser();
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
index 02d9736..67e9250 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.accumulo.serde;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.util.Collections;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -57,7 +58,7 @@ public class CompositeAccumuloRowIdFactory<T extends AccumuloCompositeRowId> ext
public void addDependencyJars(Configuration jobConf) throws IOException {
// Make sure the jar containing the custom CompositeRowId is included
// in the mapreduce job's classpath (libjars)
- Utils.addDependencyJars(jobConf, keyClass);
+ Utils.addDependencyJars(jobConf, Collections.<Class<?>> singletonList(keyClass));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
index ea04d1a..6d96d9b 100644
--- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.accumulo.serde;
import java.io.IOException;
+import java.util.Collections;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -61,7 +62,7 @@ public class DefaultAccumuloRowIdFactory implements AccumuloRowIdFactory {
@Override
public void addDependencyJars(Configuration conf) throws IOException {
- Utils.addDependencyJars(conf, getClass());
+ Utils.addDependencyJars(conf, Collections.<Class<?>> singletonList(getClass()));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
index 8d195ee..58bf4a6 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -43,14 +44,17 @@ import org.mockito.Mockito;
*/
public class TestAccumuloStorageHandler {
- protected AccumuloStorageHandler storageHandler;
+ private AccumuloStorageHandler storageHandler;
+ private Configuration conf;
@Rule
public TestName test = new TestName();
@Before
public void setup() {
+ conf = new Configuration();
storageHandler = new AccumuloStorageHandler();
+ storageHandler.setConf(conf);
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java
index 406768a..af561e0 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java
@@ -23,12 +23,16 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.log4j.Logger;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -90,51 +94,163 @@ public class TestHiveAccumuloHelper {
assertEquals(service, credTokens.iterator().next().getService());
}
- @Test(expected = IllegalStateException.class)
- public void testISEIsPropagated() throws Exception {
+ @Test
+ public void testInputFormatWithKerberosToken() throws Exception {
+ final JobConf jobConf = new JobConf();
final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+ final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class);
+ final Token hadoopToken = Mockito.mock(Token.class);
+ final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class);
+ final Connector connector = Mockito.mock(Connector.class);
+
+ final String user = "bob";
+ final String instanceName = "accumulo";
+ final String zookeepers = "host1:2181,host2:2181,host3:2181";
+ UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]);
- final JobConf jobConf = Mockito.mock(JobConf.class);
- final Class<?> inputOrOutputFormatClass = AccumuloInputFormat.class;
- final String zookeepers = "localhost:2181";
- final String instanceName = "accumulo_instance";
- final boolean useSasl = false;
+ // Call the real methods for these
+ Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.doCallRealMethod().when(helper).updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, true);
- // Call the real "public" method
- Mockito.doCallRealMethod().when(helper).setZooKeeperInstance(jobConf, inputOrOutputFormatClass,
- zookeepers, instanceName, useSasl);
+ // Return our mocked objects
+ Mockito.when(cnxnParams.getConnector()).thenReturn(connector);
+ Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken);
+ Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken);
- // Mock the private one to throw the ISE
- Mockito.doThrow(new IllegalStateException()).when(helper).
- setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers,
- instanceName, useSasl);
+ // Stub AccumuloConnectionParameters actions
+ Mockito.when(cnxnParams.useSasl()).thenReturn(true);
+ Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user);
+ Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName);
+ Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
- // Should throw an IllegalStateException
- helper.setZooKeeperInstance(jobConf, inputOrOutputFormatClass, zookeepers, instanceName,
- useSasl);
+ // Test the InputFormat execution path
+ //
+ Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(true);
+ // Invoke the InputFormat entrypoint
+ helper.updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.verify(helper).setInputFormatConnectorInfo(jobConf, user, authToken);
+ Mockito.verify(helper).mergeTokenIntoJobConf(jobConf, hadoopToken);
+ Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf);
+
+ // Make sure the token made it into the UGI
+ Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
+ Assert.assertEquals(1, tokens.size());
+ Assert.assertEquals(hadoopToken, tokens.iterator().next());
}
- @Test(expected = IllegalStateException.class)
- public void testISEIsPropagatedWithReflection() throws Exception {
+ @Test
+ public void testInputFormatWithoutKerberosToken() throws Exception {
+ final JobConf jobConf = new JobConf();
+ final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+ final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class);
+ final Token hadoopToken = Mockito.mock(Token.class);
+ final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class);
+ final Connector connector = Mockito.mock(Connector.class);
+
+ final String user = "bob";
+ final String instanceName = "accumulo";
+ final String zookeepers = "host1:2181,host2:2181,host3:2181";
+ UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]);
+
+ // Call the real methods for these
+ Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.doCallRealMethod().when(helper).updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, true);
+
+ // Return our mocked objects
+ Mockito.when(cnxnParams.getConnector()).thenReturn(connector);
+ Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken);
+ Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken);
+
+ // Stub AccumuloConnectionParameters actions
+ Mockito.when(cnxnParams.useSasl()).thenReturn(true);
+ Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user);
+ Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName);
+ Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
+
+ // Verify that when we have no kerberos credentials, we pull the serialized Token
+ Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(false);
+ helper.updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf);
+ }
+
+ @Test
+ public void testOutputFormatSaslConfigurationWithoutKerberosToken() throws Exception {
+ final JobConf jobConf = new JobConf();
final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+ final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class);
+ final Token hadoopToken = Mockito.mock(Token.class);
+ final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class);
+ final Connector connector = Mockito.mock(Connector.class);
+
+ final String user = "bob";
+ final String instanceName = "accumulo";
+ final String zookeepers = "host1:2181,host2:2181,host3:2181";
+ UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]);
+
+ // Call the real methods for these
+ Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, false);
+
+ // Return our mocked objects
+ Mockito.when(cnxnParams.getConnector()).thenReturn(connector);
+ Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken);
+ Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken);
+
+ // Stub AccumuloConnectionParameters actions
+ Mockito.when(cnxnParams.useSasl()).thenReturn(true);
+ Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user);
+ Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName);
+ Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
+
+ // Verify that when we have no kerberos credentials, we pull the serialized Token
+ Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(false);
+ helper.updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf);
+ }
+
+ @Test
+ public void testOutputFormatSaslConfigurationWithKerberosToken() throws Exception {
+ final JobConf jobConf = new JobConf();
+ final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+ final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class);
+ final Token hadoopToken = Mockito.mock(Token.class);
+ final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class);
+ final Connector connector = Mockito.mock(Connector.class);
+
+ final String user = "bob";
+ final String instanceName = "accumulo";
+ final String zookeepers = "host1:2181,host2:2181,host3:2181";
+ UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]);
+
+ // Call the real methods for these
+ Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, false);
+
+ // Return our mocked objects
+ Mockito.when(cnxnParams.getConnector()).thenReturn(connector);
+ Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken);
+ Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken);
- final JobConf jobConf = Mockito.mock(JobConf.class);
- final Class<?> inputOrOutputFormatClass = AccumuloInputFormat.class;
- final String zookeepers = "localhost:2181";
- final String instanceName = "accumulo_instance";
- final boolean useSasl = false;
+ // Stub AccumuloConnectionParameters actions
+ Mockito.when(cnxnParams.useSasl()).thenReturn(true);
+ Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user);
+ Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName);
+ Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers);
- // Call the real "public" method
- Mockito.doCallRealMethod().when(helper).setZooKeeperInstance(jobConf, inputOrOutputFormatClass,
- zookeepers, instanceName, useSasl);
+ // We have kerberos credentials
- // Mock the private one to throw the IAE
- Mockito.doThrow(new InvocationTargetException(new IllegalStateException())).when(helper).
- setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers,
- instanceName, useSasl);
+ Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(true);
+ // Invoke the OutputFormat entrypoint
+ helper.updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams);
+ Mockito.verify(helper).setOutputFormatConnectorInfo(jobConf, user, authToken);
+ Mockito.verify(helper).mergeTokenIntoJobConf(jobConf, hadoopToken);
+ Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf);
- // Should throw an IllegalStateException
- helper.setZooKeeperInstance(jobConf, inputOrOutputFormatClass, zookeepers, instanceName,
- useSasl);
+ // Make sure the token made it into the UGI
+ Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
+ Assert.assertEquals(1, tokens.size());
+ Assert.assertEquals(hadoopToken, tokens.iterator().next());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/173d9816/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
index ee5aecf..56beb8f 100644
--- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
+++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper;
import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
@@ -472,6 +473,10 @@ public class TestHiveAccumuloTableInputFormat {
Set<Range> ranges = Collections.singleton(new Range());
HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+ HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
+
+ // Stub out a mocked Helper instance
+ Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
// Call out to the real configure method
Mockito.doCallRealMethod().when(mockInputFormat)
@@ -485,8 +490,8 @@ public class TestHiveAccumuloTableInputFormat {
ranges);
// Verify that the correct methods are invoked on AccumuloInputFormat
- Mockito.verify(mockInputFormat).setMockInstance(conf, mockInstance.getInstanceName());
- Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+ Mockito.verify(helper).setInputFormatMockInstance(conf, mockInstance.getInstanceName());
+ Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
Mockito.verify(mockInputFormat).setScanAuthorizations(conf,
con.securityOperations().getUserAuthorizations(USER));
@@ -509,10 +514,13 @@ public class TestHiveAccumuloTableInputFormat {
ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class);
HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+ HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
// Stub out the ZKI mock
Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName);
Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers);
+ // Stub out a mocked Helper instance
+ Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
// Call out to the real configure method
Mockito.doCallRealMethod().when(mockInputFormat)
@@ -526,8 +534,8 @@ public class TestHiveAccumuloTableInputFormat {
ranges);
// Verify that the correct methods are invoked on AccumuloInputFormat
- Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false);
- Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+ Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
+ Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
Mockito.verify(mockInputFormat).setScanAuthorizations(conf,
con.securityOperations().getUserAuthorizations(USER));
@@ -551,10 +559,13 @@ public class TestHiveAccumuloTableInputFormat {
ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class);
HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+ HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
// Stub out the ZKI mock
Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName);
Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers);
+ // Stub out a mocked Helper instance
+ Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
// Call out to the real configure method
Mockito.doCallRealMethod().when(mockInputFormat)
@@ -568,8 +579,8 @@ public class TestHiveAccumuloTableInputFormat {
ranges);
// Verify that the correct methods are invoked on AccumuloInputFormat
- Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false);
- Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+ Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
+ Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
Mockito.verify(mockInputFormat).setScanAuthorizations(conf, new Authorizations("foo,bar"));
Mockito.verify(mockInputFormat).addIterators(conf, iterators);
@@ -605,10 +616,13 @@ public class TestHiveAccumuloTableInputFormat {
ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class);
HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+ HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
// Stub out the ZKI mock
Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName);
Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers);
+ // Stub out a mocked Helper instance
+ Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
// Call out to the real configure method
Mockito.doCallRealMethod().when(mockInputFormat)
@@ -622,8 +636,8 @@ public class TestHiveAccumuloTableInputFormat {
ranges);
// Verify that the correct methods are invoked on AccumuloInputFormat
- Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false);
- Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+ Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
+ Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
Mockito.verify(mockInputFormat).setScanAuthorizations(conf,
con.securityOperations().getUserAuthorizations(USER));
@@ -659,12 +673,15 @@ public class TestHiveAccumuloTableInputFormat {
ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class);
HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class);
+ HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class);
// Stub out the ZKI mock
Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName);
Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers);
Mockito.when(mockInputFormat.getPairCollection(columnMapper.getColumnMappings())).thenReturn(
cfCqPairs);
+ // Stub out a mocked Helper instance
+ Mockito.when(mockInputFormat.getHelper()).thenReturn(helper);
// Call out to the real configure method
Mockito.doCallRealMethod().when(mockInputFormat)
@@ -678,8 +695,8 @@ public class TestHiveAccumuloTableInputFormat {
ranges);
// Verify that the correct methods are invoked on AccumuloInputFormat
- Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false);
- Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS));
+ Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false);
+ Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS));
Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE);
Mockito.verify(mockInputFormat).setScanAuthorizations(conf,
con.securityOperations().getUserAuthorizations(USER));