You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2018/02/13 18:48:46 UTC
[accumulo] branch master updated: ACCUMULO-4784 - New fluent API
for creating Connector (#361)
This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 2a58165 ACCUMULO-4784 - New fluent API for creating Connector (#361)
2a58165 is described below
commit 2a58165c0cccb607b10b5162075a912983fb2a67
Author: Mike Walch <mw...@apache.org>
AuthorDate: Tue Feb 13 13:48:43 2018 -0500
ACCUMULO-4784 - New fluent API for creating Connector (#361)
* Replaces ClientConfiguration which is now deprecated
* New API can be passed a file with properties set. An example file 'accumulo-client.properties' is in the tarball distribution. This file should be used in place of client.conf
* Build auto-generates accumulo-client.properties for tarball and client-properties.md for documentation on website
* Fluent API can also generate ConnectionInfo that can be passed to MapReduce input & output libraries for Accumulo.
---
assemble/conf/client.conf | 20 --
assemble/pom.xml | 15 +
assemble/src/main/assemblies/component.xml | 8 +
core/pom.xml | 15 +
.../accumulo/core/client/BatchWriterConfig.java | 30 ++
.../accumulo/core/client/ClientConfiguration.java | 1 +
.../accumulo/core/client/ConnectionInfo.java | 54 ++++
.../org/apache/accumulo/core/client/Connector.java | 323 ++++++++++++++++++++-
.../accumulo/core/client/impl/ClientContext.java | 23 +-
.../core/client/impl/ConnectionInfoFactory.java | 120 ++++++++
.../core/client/impl/ConnectionInfoImpl.java | 63 ++++
.../accumulo/core/client/impl/ConnectorImpl.java | 257 +++++++++++++++-
.../core/client/mapred/AbstractInputFormat.java | 24 +-
.../core/client/mapred/AccumuloInputFormat.java | 8 +-
.../mapred/AccumuloMultiTableInputFormat.java | 3 +-
.../core/client/mapred/AccumuloOutputFormat.java | 29 +-
.../core/client/mapred/AccumuloRowInputFormat.java | 6 +-
.../core/client/mapreduce/AbstractInputFormat.java | 22 +-
.../core/client/mapreduce/AccumuloInputFormat.java | 7 +-
.../client/mapreduce/AccumuloOutputFormat.java | 26 +-
.../client/mapreduce/AccumuloRowInputFormat.java | 6 +-
.../accumulo/core/client/mock/MockConnector.java | 15 +
.../security/tokens/CredentialProviderToken.java | 20 +-
.../accumulo/core/conf/ClientConfigGenerate.java | 193 ++++++++++++
.../apache/accumulo/core/conf/ClientProperty.java | 133 +++++++++
.../accumulo/core/conf/ConfigurationDocGen.java | 3 +-
.../core/client/BatchWriterConfigTest.java | 19 ++
.../tokens/CredentialProviderTokenTest.java | 2 +
.../accumulo/harness/AccumuloClusterHarness.java | 6 +
.../test/functional/ConfigurableMacBase.java | 5 +
.../accumulo/test/functional/ConnectorIT.java | 63 ++++
.../test/mapred/AccumuloInputFormatIT.java | 6 +-
.../test/mapred/AccumuloOutputFormatIT.java | 16 +-
.../test/mapreduce/AccumuloInputFormatIT.java | 7 +-
.../test/mapreduce/AccumuloOutputFormatIT.java | 6 +-
35 files changed, 1465 insertions(+), 89 deletions(-)
diff --git a/assemble/conf/client.conf b/assemble/conf/client.conf
deleted file mode 100644
index 5256b13..0000000
--- a/assemble/conf/client.conf
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# instance.zookeeper.host=localhost:2181
-# instance.rpc.ssl.enabled=false
-
-# instance.rcp.sasl.enabled=false
-# rpc.sasl.qop=auth
diff --git a/assemble/pom.xml b/assemble/pom.xml
index 4167de3..9a9d94d 100644
--- a/assemble/pom.xml
+++ b/assemble/pom.xml
@@ -384,6 +384,21 @@
<executable>${basedir}/src/main/scripts/generate-versions-listing.sh</executable>
</configuration>
</execution>
+ <execution>
+ <id>client-props-file</id>
+ <goals>
+ <goal>java</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ <configuration>
+ <mainClass>org.apache.accumulo.core.conf.ClientConfigGenerate</mainClass>
+ <classpathScope>test</classpathScope>
+ <arguments>
+ <argument>--generate-config</argument>
+ <argument>${project.build.directory}/accumulo-client.properties</argument>
+ </arguments>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git a/assemble/src/main/assemblies/component.xml b/assemble/src/main/assemblies/component.xml
index 405b7b3..98e3b9c 100644
--- a/assemble/src/main/assemblies/component.xml
+++ b/assemble/src/main/assemblies/component.xml
@@ -117,6 +117,14 @@
<directoryMode>0755</directoryMode>
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>conf</outputDirectory>
+ <fileMode>0644</fileMode>
+ <includes>
+ <include>accumulo-client.properties</include>
+ </includes>
+ </fileSet>
<!-- Lift generated thrift proxy code into its own directory -->
<fileSet>
<directory>../proxy/target</directory>
diff --git a/core/pom.xml b/core/pom.xml
index 16fa16f..fe89b6b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -291,6 +291,21 @@
</arguments>
</configuration>
</execution>
+ <execution>
+ <id>client-props-markdown</id>
+ <goals>
+ <goal>java</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <mainClass>org.apache.accumulo.core.conf.ClientConfigGenerate</mainClass>
+ <classpathScope>test</classpathScope>
+ <arguments>
+ <argument>--generate-markdown</argument>
+ <argument>${project.build.directory}/generated-docs/client-properties.md</argument>
+ </arguments>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
index 521e0ce..3da6459 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
@@ -50,6 +50,7 @@ public class BatchWriterConfig implements Writable {
private Integer maxWriteThreads = null;
private Durability durability = Durability.DEFAULT;
+ private boolean isDurabilitySet = false;
/**
* Sets the maximum memory to batch before writing. The smaller this value, the more frequently the {@link BatchWriter} will write.<br>
@@ -190,6 +191,7 @@ public class BatchWriterConfig implements Writable {
*/
public BatchWriterConfig setDurability(Durability durability) {
this.durability = durability;
+ isDurabilitySet = true;
return this;
}
@@ -309,6 +311,34 @@ public class BatchWriterConfig implements Writable {
return false;
}
+ private static <T> T merge(T o1, T o2) {
+ if (o1 != null)
+ return o1;
+ return o2;
+ }
+
+ /**
+ * Merge this BatchWriterConfig with another. If config is set in both, preference will be given to this config.
+ *
+ * @param other
+ * Another BatchWriterConfig
+ * @return Merged BatchWriterConfig
+ * @since 2.0.0
+ */
+ public BatchWriterConfig merge(BatchWriterConfig other) {
+ BatchWriterConfig result = new BatchWriterConfig();
+ result.maxMemory = merge(this.maxMemory, other.maxMemory);
+ result.maxLatency = merge(this.maxLatency, other.maxLatency);
+ result.timeout = merge(this.timeout, other.timeout);
+ result.maxWriteThreads = merge(this.maxWriteThreads, other.maxWriteThreads);
+ if (this.isDurabilitySet) {
+ result.durability = this.durability;
+ } else if (other.isDurabilitySet) {
+ result.durability = other.durability;
+ }
+ return result;
+ }
+
@Override
public int hashCode() {
HashCodeBuilder hcb = new HashCodeBuilder();
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
index 6f0274c..a269229 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
* Contains a list of property keys recognized by the Accumulo client and convenience methods for setting them.
*
* @since 1.6.0
+ * @deprecated since 2.0.0, replaced {@link Connector#builder()}
*/
public class ClientConfiguration {
private static final Logger log = LoggerFactory.getLogger(ClientConfiguration.class);
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java b/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java
new file mode 100644
index 0000000..9a43073
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+
+/**
+ * Accumulo client connection information. Can be built using {@link Connector#builder()}
+ *
+ * @since 2.0.0
+ */
+public interface ConnectionInfo {
+
+ /**
+ * @return Accumulo instance name
+ */
+ String getInstanceName();
+
+ /**
+ * @return Zookeeper connection information for Accumulo instance
+ */
+ String getZookeepers();
+
+ /**
+ * @return Accumulo principal/username
+ */
+ String getPrincipal();
+
+ /**
+ * @return {@link AuthenticationToken} used for this connection
+ */
+ AuthenticationToken getAuthenticationToken();
+
+ /**
+ * @return All Accumulo client properties set for this connection
+ */
+ Properties getProperties();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index e36cc82..9ac7d71 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -16,11 +16,15 @@
*/
package org.apache.accumulo.core.client;
+import java.util.Properties;
+
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.NamespaceOperations;
import org.apache.accumulo.core.client.admin.ReplicationOperations;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.impl.ConnectorImpl;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.security.Authorizations;
/**
@@ -77,6 +81,7 @@ public abstract class Connector {
int maxWriteThreads) throws TableNotFoundException;
/**
+ * Factory method to create BatchDeleter
*
* @param tableName
* the name of the table to query and delete from
@@ -87,7 +92,8 @@ public abstract class Connector {
* @param numQueryThreads
* the number of concurrent threads to spawn for querying
* @param config
- * configuration used to create batch writer
+ * configuration used to create batch writer. This config takes precedence. Any unset values will be merged with config set when the Connector was
+ * created. If no config was set during Connector creation, BatchWriterConfig defaults will be used.
* @return BatchDeleter object for configuring and deleting
* @since 1.5.0
*/
@@ -96,6 +102,24 @@ public abstract class Connector {
throws TableNotFoundException;
/**
+ * Factory method to create BatchDeleter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig defaults will
+ * be used.
+ *
+ * @param tableName
+ * the name of the table to query and delete from
+ * @param authorizations
+ * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in
+ * must be a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are
+ * passed, then an exception will be thrown.
+ * @param numQueryThreads
+ * the number of concurrent threads to spawn for querying
+ * @return BatchDeleter object
+ * @throws TableNotFoundException
+ * if table not found
+ */
+ public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException;
+
+ /**
* Factory method to create a BatchWriter connected to Accumulo.
*
* @param tableName
@@ -121,7 +145,8 @@ public abstract class Connector {
* @param tableName
* the name of the table to insert data into
* @param config
- * configuration used to create batch writer
+ * configuration used to create batch writer. This config will take precedence. Any unset values will merged with config set when the Connector was
+ * created. If no config was set during Connector creation, BatchWriterConfig defaults will be used.
* @return BatchWriter object for configuring and writing data to
* @since 1.5.0
*/
@@ -129,6 +154,19 @@ public abstract class Connector {
public abstract BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException;
/**
+ * Factory method to create a BatchWriter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig defaults will
+ * be used.
+ *
+ * @param tableName
+ * the name of the table to insert data into
+ * @return BatchWriter object
+ * @throws TableNotFoundException
+ * if table not found
+ * @since 2.0.0
+ */
+ public abstract BatchWriter createBatchWriter(String tableName) throws TableNotFoundException;
+
+ /**
* Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables, which is good for
* ingesting data into multiple tables from the same source
*
@@ -150,14 +188,23 @@ public abstract class Connector {
* multiple tables can be sent to a server in a single batch. Its an efficient way to ingest data into multiple tables from a single process.
*
* @param config
- * configuration used to create multi-table batch writer
+ * configuration used to create multi-table batch writer. This config will take precedence. Any unset values will merged with config set when the
+ * Connector was created. If no config was set during Connector creation, BatchWriterConfig defaults will be used.
* @return MultiTableBatchWriter object for configuring and writing data to
* @since 1.5.0
*/
-
public abstract MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config);
/**
+ * Factory method to create a Multi-Table BatchWriter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig
+ * defaults will be used.
+ *
+ * @return MultiTableBatchWriter object
+ * @since 2.0.0
+ */
+ public abstract MultiTableBatchWriter createMultiTableBatchWriter();
+
+ /**
* Factory method to create a Scanner connected to Accumulo.
*
* @param tableName
@@ -237,4 +284,272 @@ public abstract class Connector {
* @since 1.7.0
*/
public abstract ReplicationOperations replicationOperations();
+
+ /**
+ * Builds ConnectionInfo after all options have been specified
+ *
+ * @since 2.0.0
+ */
+ public interface ConnInfoFactory {
+
+ /**
+ * Builds ConnectionInfo after all options have been specified
+ *
+ * @return ConnectionInfo
+ */
+ ConnectionInfo info();
+ }
+
+ /**
+ * Builds Connector
+ *
+ * @since 2.0.0
+ */
+ public interface ConnectorFactory extends ConnInfoFactory {
+
+ /**
+ * Builds Connector after all options have been specified
+ *
+ * @return Connector
+ */
+ Connector build() throws AccumuloException, AccumuloSecurityException;
+
+ }
+
+ /**
+ * Builder method for setting Accumulo instance and zookeepers
+ *
+ * @since 2.0.0
+ */
+ public interface InstanceArgs {
+ AuthenticationArgs forInstance(String instanceName, String zookeepers);
+ }
+
+ /**
+ * Builder methods for creating Connector using properties
+ *
+ * @since 2.0.0
+ */
+ public interface PropertyOptions extends InstanceArgs {
+
+ /**
+ * Build using properties file. An example properties file can be found at conf/accumulo-client.properties in the Accumulo tarball distribution.
+ *
+ * @param propertiesFile
+ * Path to properties file
+ * @return this builder
+ */
+ ConnectorFactory usingProperties(String propertiesFile);
+
+ /**
+ * Build using Java properties object. A list of available properties can be found in the documentation on the project website (http://accumulo.apache.org)
+ * under 'Development' -> 'Client Properties'
+ *
+ * @param properties
+ * Properties object
+ * @return this builder
+ */
+ ConnectorFactory usingProperties(Properties properties);
+ }
+
+ public interface ConnectionInfoOptions extends PropertyOptions {
+
+ /**
+ * Build using connection information
+ *
+ * @param connectionInfo
+ * ConnectionInfo object
+ * @return this builder
+ */
+ ConnectorFactory usingConnectionInfo(ConnectionInfo connectionInfo);
+ }
+
+ /**
+ * Build methods for authentication
+ *
+ * @since 2.0.0
+ */
+ public interface AuthenticationArgs {
+
+ /**
+ * Build using password-based credentials
+ *
+ * @param username
+ * User name
+ * @param password
+ * Password
+ * @return this builder
+ */
+ ConnectionOptions usingPassword(String username, CharSequence password);
+
+ /**
+ * Build using Kerberos credentials
+ *
+ * @param principal
+ * Principal
+ * @param keyTabFile
+ * Path to keytab file
+ * @return this builder
+ */
+ ConnectionOptions usingKerberos(String principal, String keyTabFile);
+
+ /**
+ * Build using credentials from a CredentialProvider
+ *
+ * @param username
+ * Accumulo user name
+ * @param name
+ * Alias to extract Accumulo user password from CredentialProvider
+ * @param providerUrls
+ * Comma seperated list of URLs defining CredentialProvider(s)
+ * @return this builder
+ */
+ ConnectionOptions usingProvider(String username, String name, String providerUrls);
+
+ /**
+ * Build using specified credentials
+ *
+ * @param principal
+ * Principal/username
+ * @param token
+ * Authentication token
+ * @return this builder
+ */
+ ConnectionOptions usingToken(String principal, AuthenticationToken token);
+ }
+
+ /**
+ * Build methods for SSL/TLS
+ *
+ * @since 2.0.0
+ */
+ public interface SslOptions extends ConnectorFactory {
+
+ /**
+ * Build with SSL trust store
+ *
+ * @param path
+ * Path to trust store
+ * @return this builder
+ */
+ SslOptions withTruststore(String path);
+
+ /**
+ * Build with SSL trust store
+ *
+ * @param path
+ * Path to trust store
+ * @param password
+ * Password used to encrypt trust store
+ * @param type
+ * Trust store type
+ * @return this builder
+ */
+ SslOptions withTruststore(String path, String password, String type);
+
+ /**
+ * Build with SSL key store
+ *
+ * @param path
+ * Path to SSL key store
+ * @return this builder
+ */
+ SslOptions withKeystore(String path);
+
+ /**
+ * Build with SSL key store
+ *
+ * @param path
+ * Path to keystore
+ * @param password
+ * Password used to encyrpt key store
+ * @param type
+ * Key store type
+ * @return this builder
+ */
+ SslOptions withKeystore(String path, String password, String type);
+
+ /**
+ * Use JSSE system properties to configure SSL
+ *
+ * @return this builder
+ */
+ SslOptions useJsse();
+ }
+
+ /**
+ * Build methods for SASL
+ *
+ * @since 2.0.0
+ */
+ public interface SaslOptions extends ConnectorFactory {
+
+ /**
+ * Build with Kerberos Server Primary
+ *
+ * @param kerberosServerPrimary
+ * Kerberos server primary
+ * @return this builder
+ */
+ SaslOptions withPrimary(String kerberosServerPrimary);
+
+ /**
+ * Build with SASL quality of protection
+ *
+ * @param qualityOfProtection
+ * Quality of protection
+ * @return this builder
+ */
+ SaslOptions withQop(String qualityOfProtection);
+ }
+
+ /**
+ * Build methods for connection options
+ *
+ * @since 2.0.0
+ */
+ public interface ConnectionOptions extends ConnectorFactory {
+
+ /**
+ * Build using Zookeeper timeout
+ *
+ * @param timeout
+ * Zookeeper timeout
+ * @return this builder
+ */
+ ConnectionOptions withZkTimeout(int timeout);
+
+ /**
+ * Build with SSL/TLS options
+ *
+ * @return this builder
+ */
+ SslOptions withSsl();
+
+ /**
+ * Build with SASL options
+ *
+ * @return this builder
+ */
+ SaslOptions withSasl();
+
+ /**
+ * Build with BatchWriterConfig defaults for BatchWriter, MultiTableBatchWriter & BatchDeleter
+ *
+ * @param batchWriterConfig
+ * BatchWriterConfig
+ * @return this builder
+ */
+ ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig);
+ }
+
+ /**
+ * Creates builder for Connector
+ *
+ * @return this builder
+ * @since 2.0.0
+ */
+ public static PropertyOptions builder() {
+ return new ConnectorImpl.ConnectorBuilderImpl();
+ }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
index b510c45..356fa02 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
@@ -26,8 +26,8 @@ import java.util.function.Predicate;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
@@ -54,15 +54,18 @@ public class ClientContext {
protected final Instance inst;
private Credentials creds;
private ClientConfiguration clientConf;
+ private BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
private final AccumuloConfiguration rpcConf;
protected Connector conn;
- /**
- * Instantiate a client context
- */
public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf) {
+ this(instance, credentials, clientConf, new BatchWriterConfig());
+ }
+
+ public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf, BatchWriterConfig batchWriterConfig) {
this(instance, credentials, convertClientConfig(requireNonNull(clientConf, "clientConf is null")));
this.clientConf = clientConf;
+ this.batchWriterConfig = batchWriterConfig;
}
/**
@@ -153,6 +156,10 @@ public class ClientContext {
return conn;
}
+ public BatchWriterConfig getBatchWriterConfig() {
+ return batchWriterConfig;
+ }
+
/**
* Serialize the credentials just before initiating the RPC call
*/
@@ -200,9 +207,9 @@ public class ClientContext {
else {
// Reconstitute the server kerberos property from the client config
if (Property.GENERAL_KERBEROS_PRINCIPAL == property) {
- if (config.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) {
+ if (config.containsKey(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) {
// Avoid providing a realm since we don't know what it is...
- return config.getString(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()) + "/_HOST@" + SaslConnectionParams.getDefaultRealm();
+ return config.getString(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()) + "/_HOST@" + SaslConnectionParams.getDefaultRealm();
}
}
return defaults.get(property);
@@ -222,8 +229,8 @@ public class ClientContext {
// Two client props that don't exist on the server config. Client doesn't need to know about the Kerberos instance from the principle, but servers do
// Automatically reconstruct the server property when converting a client config.
- if (props.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) {
- final String serverPrimary = props.remove(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey());
+ if (props.containsKey(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) {
+ final String serverPrimary = props.remove(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey());
if (filter.test(Property.GENERAL_KERBEROS_PRINCIPAL.getKey())) {
// Use the _HOST expansion. It should be unnecessary in "client land".
props.put(Property.GENERAL_KERBEROS_PRINCIPAL.getKey(), serverPrimary + "/_HOST@" + SaslConnectionParams.getDefaultRealm());
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java
new file mode 100644
index 0000000..1aafc06
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.conf.ClientProperty;
+
+/**
+ * Creates internal objects using {@link ConnectionInfo}
+ */
+public class ConnectionInfoFactory {
+
+ public static String getString(ConnectionInfo info, ClientProperty property) {
+ return property.getValue(info.getProperties());
+ }
+
+ public static Long getLong(ConnectionInfo info, ClientProperty property) {
+ return property.getLong(info.getProperties());
+ }
+
+ public static Connector getConnector(ConnectionInfo info) throws AccumuloSecurityException, AccumuloException {
+ return new ConnectorImpl(getClientContext(info));
+ }
+
+ public static ClientContext getClientContext(ConnectionInfo info) {
+ return new ClientContext(getInstance(info), getCredentials(info), getClientConfiguration(info), getBatchWriterConfig(info));
+ }
+
+ public static Instance getInstance(ConnectionInfo info) {
+ return new ZooKeeperInstance(getClientConfiguration(info));
+ }
+
+ public static Credentials getCredentials(ConnectionInfo info) {
+ return new Credentials(info.getPrincipal(), info.getAuthenticationToken());
+ }
+
+ public static BatchWriterConfig getBatchWriterConfig(ConnectionInfo info) {
+ BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
+ Long maxMemory = getLong(info, ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES);
+ if (maxMemory != null) {
+ batchWriterConfig.setMaxMemory(maxMemory);
+ }
+ Long maxLatency = getLong(info, ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC);
+ if (maxLatency != null) {
+ batchWriterConfig.setMaxLatency(maxLatency, TimeUnit.SECONDS);
+ }
+ Long timeout = getLong(info, ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC);
+ if (timeout != null) {
+ batchWriterConfig.setTimeout(timeout, TimeUnit.SECONDS);
+ }
+ String durability = getString(info, ClientProperty.BATCH_WRITER_DURABILITY);
+ if (!durability.isEmpty()) {
+ batchWriterConfig.setDurability(Durability.valueOf(durability.toUpperCase()));
+ }
+ return batchWriterConfig;
+ }
+
+ public static ClientConfiguration getClientConfiguration(ConnectionInfo info) {
+ ClientConfiguration config = ClientConfiguration.create();
+ for (Object keyObj : info.getProperties().keySet()) {
+ String key = (String) keyObj;
+ String val = info.getProperties().getProperty(key);
+ if (key.equals(ClientProperty.INSTANCE_ZOOKEEPERS.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_ZK_HOST, val);
+ } else if (key.equals(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT_SEC.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_ZK_TIMEOUT, val);
+ } else if (key.equals(ClientProperty.SSL_ENABLED.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_ENABLED, val);
+ } else if (key.equals(ClientProperty.SSL_KEYSTORE_PATH.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_PATH, val);
+ config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH, "true");
+ } else if (key.equals(ClientProperty.SSL_KEYSTORE_TYPE.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_TYPE, val);
+ } else if (key.equals(ClientProperty.SSL_KEYSTORE_PASSWORD.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_PASSWORD, val);
+ } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_PATH.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val);
+ } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_TYPE.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val);
+ } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_PASSWORD.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val);
+ } else if (key.equals(ClientProperty.SSL_USE_JSSE.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.RPC_USE_JSSE, val);
+ } else if (key.equals(ClientProperty.SASL_ENABLED.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_ENABLED, val);
+ } else if (key.equals(ClientProperty.SASL_QOP.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.RPC_SASL_QOP, val);
+ } else if (key.equals(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY.getKey())) {
+ config.setProperty(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY, val);
+ } else {
+ config.setProperty(key, val);
+ }
+ }
+ return config;
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java
new file mode 100644
index 0000000..916625c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ConnectionInfo;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.ClientProperty;
+
+public class ConnectionInfoImpl implements ConnectionInfo {
+
+ private Properties properties;
+ private AuthenticationToken token;
+
+ ConnectionInfoImpl(Properties properties, AuthenticationToken token) {
+ this.properties = properties;
+ this.token = token;
+ }
+
+ @Override
+ public String getInstanceName() {
+ return getString(ClientProperty.INSTANCE_NAME);
+ }
+
+ @Override
+ public String getZookeepers() {
+ return getString(ClientProperty.INSTANCE_ZOOKEEPERS);
+ }
+
+ @Override
+ public String getPrincipal() {
+ return getString(ClientProperty.AUTH_USERNAME);
+ }
+
+ @Override
+ public Properties getProperties() {
+ return properties;
+ }
+
+ @Override
+ public AuthenticationToken getAuthenticationToken() {
+ return token;
+ }
+
+ private String getString(ClientProperty property) {
+ return property.getValue(properties);
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index f49e4dc..03c719c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -18,6 +18,12 @@ package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
@@ -28,6 +34,7 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -41,6 +48,11 @@ import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.impl.thrift.ClientService;
import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.CredentialProviderToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Tracer;
@@ -54,7 +66,7 @@ public class ConnectorImpl extends Connector {
private InstanceOperations instanceops = null;
private ReplicationOperations replicationops = null;
- public ConnectorImpl(final ClientContext context) throws AccumuloException, AccumuloSecurityException {
+ public ConnectorImpl(final ClientContext context) throws AccumuloSecurityException, AccumuloException {
checkArgument(context != null, "Context is null");
checkArgument(context.getCredentials() != null, "Credentials are null");
checkArgument(context.getCredentials().getToken() != null, "Authentication token is null");
@@ -113,7 +125,12 @@ public class ConnectorImpl extends Connector {
throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
- return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, numQueryThreads, config);
+ return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, numQueryThreads, config.merge(context.getBatchWriterConfig()));
+ }
+
+ @Override
+ public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException {
+ return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig());
}
@Deprecated
@@ -127,7 +144,16 @@ public class ConnectorImpl extends Connector {
@Override
public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
- return new BatchWriterImpl(context, getTableId(tableName), config);
+ // we used to allow null inputs for bw config
+ if (config == null) {
+ config = new BatchWriterConfig();
+ }
+ return new BatchWriterImpl(context, getTableId(tableName), config.merge(context.getBatchWriterConfig()));
+ }
+
+ @Override
+ public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException {
+ return createBatchWriter(tableName, new BatchWriterConfig());
}
@Deprecated
@@ -139,7 +165,12 @@ public class ConnectorImpl extends Connector {
@Override
public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
- return new MultiTableBatchWriterImpl(context, config);
+ return new MultiTableBatchWriterImpl(context, config.merge(context.getBatchWriterConfig()));
+ }
+
+ @Override
+ public MultiTableBatchWriter createMultiTableBatchWriter() {
+ return createMultiTableBatchWriter(new BatchWriterConfig());
}
@Override
@@ -193,4 +224,222 @@ public class ConnectorImpl extends Connector {
return replicationops;
}
+
+ public static class ConnectorBuilderImpl implements InstanceArgs, PropertyOptions, ConnectionInfoOptions, AuthenticationArgs, ConnectionOptions, SslOptions,
+ SaslOptions, ConnectorFactory {
+
+ private Properties properties = new Properties();
+ private AuthenticationToken token = null;
+
+ @Override
+ public Connector build() throws AccumuloException, AccumuloSecurityException {
+ return ConnectionInfoFactory.getConnector(new ConnectionInfoImpl(properties, token));
+ }
+
+ @Override
+ public ConnectionInfo info() {
+ return new ConnectionInfoImpl(properties, token);
+ }
+
+ @Override
+ public AuthenticationArgs forInstance(String instanceName, String zookeepers) {
+ setProperty(ClientProperty.INSTANCE_NAME, instanceName);
+ setProperty(ClientProperty.INSTANCE_ZOOKEEPERS, zookeepers);
+ return this;
+ }
+
+ @Override
+ public SslOptions withTruststore(String path) {
+ setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
+ return this;
+ }
+
+ @Override
+ public SslOptions withTruststore(String path, String password, String type) {
+ setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
+ setProperty(ClientProperty.SSL_TRUSTSTORE_PASSWORD, password);
+ setProperty(ClientProperty.SSL_TRUSTSTORE_TYPE, type);
+ return this;
+ }
+
+ @Override
+ public SslOptions withKeystore(String path) {
+ setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
+ return this;
+ }
+
+ @Override
+ public SslOptions withKeystore(String path, String password, String type) {
+ setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
+ setProperty(ClientProperty.SSL_KEYSTORE_PASSWORD, password);
+ setProperty(ClientProperty.SSL_KEYSTORE_TYPE, type);
+ return this;
+ }
+
+ @Override
+ public SslOptions useJsse() {
+ setProperty(ClientProperty.SSL_USE_JSSE, "true");
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions withZkTimeout(int timeout) {
+ setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT_SEC, Integer.toString(timeout));
+ return this;
+ }
+
+ @Override
+ public SslOptions withSsl() {
+ setProperty(ClientProperty.SSL_ENABLED, "true");
+ return this;
+ }
+
+ @Override
+ public SaslOptions withSasl() {
+ setProperty(ClientProperty.SASL_ENABLED, "true");
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
+ setProperty(ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES, batchWriterConfig.getMaxMemory());
+ setProperty(ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC, batchWriterConfig.getMaxLatency(TimeUnit.SECONDS));
+ setProperty(ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC, batchWriterConfig.getTimeout(TimeUnit.SECONDS));
+ setProperty(ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS, batchWriterConfig.getMaxWriteThreads());
+ setProperty(ClientProperty.BATCH_WRITER_DURABILITY, batchWriterConfig.getDurability().toString());
+ return this;
+ }
+
+ @Override
+ public SaslOptions withPrimary(String kerberosServerPrimary) {
+ setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
+ return this;
+ }
+
+ @Override
+ public SaslOptions withQop(String qualityOfProtection) {
+ setProperty(ClientProperty.SASL_QOP, qualityOfProtection);
+ return this;
+ }
+
+ @Override
+ public ConnectorFactory usingProperties(String configFile) {
+ Properties properties = new Properties();
+ try (InputStream is = new FileInputStream(configFile)) {
+ properties.load(is);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return usingProperties(properties);
+ }
+
+ @Override
+ public ConnectorFactory usingProperties(Properties properties) {
+ this.properties = properties;
+ String authMethod = ClientProperty.AUTH_METHOD.getValue(properties).toLowerCase();
+ switch (authMethod) {
+ case "password":
+ String password = ClientProperty.AUTH_PASSWORD.getValue(properties);
+ Objects.nonNull(password);
+ this.token = new PasswordToken(password);
+ this.properties.remove(ClientProperty.AUTH_PASSWORD);
+ break;
+ case "kerberos":
+ String principal = ClientProperty.AUTH_USERNAME.getValue(properties);
+ String keytabPath = ClientProperty.AUTH_KERBEROS_KEYTAB_PATH.getValue(properties);
+ Objects.nonNull(principal);
+ Objects.nonNull(keytabPath);
+ try {
+ this.token = new KerberosToken(principal, new File(keytabPath));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ case "provider":
+ String name = ClientProperty.AUTH_PROVIDER_NAME.getValue(properties);
+ String providerUrls = ClientProperty.AUTH_PROVIDER_URLS.getValue(properties);
+ try {
+ this.token = new CredentialProviderToken(name, providerUrls);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ default:
+ throw new IllegalArgumentException("An authentication method (password, kerberos, etc) must be set");
+ }
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions usingPassword(String username, CharSequence password) {
+ setProperty(ClientProperty.AUTH_METHOD, "password");
+ setProperty(ClientProperty.AUTH_USERNAME, username);
+ this.token = new PasswordToken(password);
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions usingKerberos(String principal, String keyTabFile) {
+ setProperty(ClientProperty.AUTH_METHOD, "kerberos");
+ setProperty(ClientProperty.AUTH_USERNAME, principal);
+ setProperty(ClientProperty.AUTH_KERBEROS_KEYTAB_PATH, keyTabFile);
+ try {
+ this.token = new KerberosToken(principal, new File(keyTabFile));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions usingProvider(String username, String name, String providerUrls) {
+ setProperty(ClientProperty.AUTH_METHOD, "provider");
+ setProperty(ClientProperty.AUTH_USERNAME, username);
+ setProperty(ClientProperty.AUTH_PROVIDER_NAME, name);
+ setProperty(ClientProperty.AUTH_PROVIDER_URLS, providerUrls);
+ try {
+ this.token = new CredentialProviderToken(name, providerUrls);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions usingToken(String principal, AuthenticationToken token) {
+ this.token = token;
+ setProperty(ClientProperty.AUTH_USERNAME, principal);
+ if (token instanceof CredentialProviderToken) {
+ setProperty(ClientProperty.AUTH_METHOD, "provider");
+ CredentialProviderToken cpt = (CredentialProviderToken) token;
+ setProperty(ClientProperty.AUTH_PROVIDER_NAME, cpt.getName());
+ setProperty(ClientProperty.AUTH_PROVIDER_URLS, cpt.getCredentialProviders());
+ } else if (token instanceof PasswordToken) {
+ setProperty(ClientProperty.AUTH_METHOD, "password");
+ } else if (token instanceof KerberosToken) {
+ setProperty(ClientProperty.AUTH_METHOD, "kerberos");
+ setProperty(ClientProperty.AUTH_KERBEROS_KEYTAB_PATH, ((KerberosToken) token).getKeytab().getAbsolutePath());
+ } else {
+ setProperty(ClientProperty.AUTH_METHOD, "unknown");
+ }
+ return this;
+ }
+
+ @Override
+ public ConnectorFactory usingConnectionInfo(ConnectionInfo connectionInfo) {
+ this.properties = connectionInfo.getProperties();
+ this.token = connectionInfo.getAuthenticationToken();
+ return this;
+ }
+
+ public void setProperty(ClientProperty property, String value) {
+ properties.setProperty(property.getKey(), value);
+ }
+
+ public void setProperty(ClientProperty property, Long value) {
+ setProperty(property, Long.toString(value));
+ }
+
+ public void setProperty(ClientProperty property, Integer value) {
+ setProperty(property, Integer.toString(value));
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index 28a3355..5583f9f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
@@ -48,6 +49,7 @@ import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ConnectionInfoFactory;
import org.apache.accumulo.core.client.impl.Credentials;
import org.apache.accumulo.core.client.impl.DelegationTokenImpl;
import org.apache.accumulo.core.client.impl.OfflineScanner;
@@ -116,6 +118,20 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
}
/**
+ * Sets connection information needed to communicate with Accumulo for this job
+ *
+ * @param job
+ * Hadoop job instance to be configured
+ * @param info
+ * Connection information for Accumulo
+ * @since 2.0.0
+ */
+ public static void setConnectionInfo(JobConf job, ConnectionInfo info) throws AccumuloSecurityException {
+ setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken());
+ setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info));
+ }
+
+ /**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
@@ -131,7 +147,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
* @param token
* the user's password
* @since 1.5.0
+ * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead
*/
+ @Deprecated
public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
if (token instanceof KerberosToken) {
log.info("Received KerberosToken, attempting to fetch DelegationToken");
@@ -172,7 +190,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
* @param tokenFile
* the path to the token file
* @since 1.6.0
+ * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead
*/
+ @Deprecated
public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
}
@@ -228,7 +248,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
- * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead.
+ * @deprecated since 1.6.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead.
*/
@Deprecated
public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
@@ -243,7 +263,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
* @param clientConfig
* client configuration containing connection options
* @since 1.6.0
+ * @deprecated since 2.0.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead.
*/
+ @Deprecated
public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) {
InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
index 5f00ec3..affb535 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
@@ -19,8 +19,7 @@ package org.apache.accumulo.core.client.mapred;
import java.io.IOException;
import java.util.Map.Entry;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -39,10 +38,9 @@ import org.apache.log4j.Level;
* The user must specify the following via static configurator methods:
*
* <ul>
- * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
- * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)}
+ * <li>{@link AccumuloInputFormat#setConnectionInfo(JobConf, ConnectionInfo)}
+ * <li>{@link AccumuloInputFormat#setInputTableName(JobConf, String)}</li>
* <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)}
- * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)}
* </ul>
*
* Other static methods are optional.
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
index 3a2e3fa..3c0b4b6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
import org.apache.accumulo.core.data.Key;
@@ -37,7 +38,7 @@ import org.apache.hadoop.mapred.Reporter;
* The user must specify the following via static configurator methods:
*
* <ul>
- * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, org.apache.accumulo.core.client.security.tokens.AuthenticationToken)}
+ * <li>{@link AccumuloInputFormat#setConnectionInfo(JobConf, ConnectionInfo)}
* <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)}
* <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, org.apache.accumulo.core.security.Authorizations)}
* <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
index 6f07ce7..426a4d6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -37,6 +38,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
+import org.apache.accumulo.core.client.impl.ConnectionInfoFactory;
import org.apache.accumulo.core.client.impl.DelegationTokenImpl;
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
@@ -68,9 +70,7 @@ import org.apache.log4j.Logger;
* The user must specify the following via static configurator methods:
*
* <ul>
- * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
- * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, String)}
- * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)}
+ * <li>{@link AccumuloOutputFormat#setConnectionInfo(JobConf, ConnectionInfo)}
* </ul>
*
* Other static methods are optional.
@@ -81,6 +81,20 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
protected static final Logger log = Logger.getLogger(CLASS);
/**
+ * Set the connection information needed to communicate with Accumulo in this job.
+ *
+ * @param job
+ * Hadoop job to be configured
+ * @param info
+ * Accumulo connection information
+ * @since 2.0.0
+ */
+ public static void setConnectionInfo(JobConf job, ConnectionInfo info) throws AccumuloSecurityException {
+ setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken());
+ setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info));
+ }
+
+ /**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
@@ -96,7 +110,9 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
* @param token
* the user's password
* @since 1.5.0
+ * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead.
*/
+ @Deprecated
public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
if (token instanceof KerberosToken) {
log.info("Received KerberosToken, attempting to fetch DelegationToken");
@@ -137,7 +153,9 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
* @param tokenFile
* the path to the password file
* @since 1.6.0
+ * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead
*/
+ @Deprecated
public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
OutputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
}
@@ -215,9 +233,8 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
- * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead.
+ * @deprecated since 1.6.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead.
*/
-
@Deprecated
public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
setZooKeeperInstance(job, ClientConfiguration.create().withInstance(instanceName).withZkHosts(zooKeepers));
@@ -232,7 +249,9 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
* @param clientConfig
* client configuration for specifying connection timeouts, SSL connection options, etc.
* @since 1.6.0
+ * @deprecated since 2.0.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead.
*/
+ @Deprecated
public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) {
OutputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
index 5049ef7..62b949a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
@@ -19,9 +19,8 @@ package org.apache.accumulo.core.client.mapred;
import java.io.IOException;
import java.util.Map.Entry;
-import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -40,10 +39,9 @@ import org.apache.hadoop.mapred.Reporter;
* The user must specify the following via static configurator methods:
*
* <ul>
- * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
+ * <li>{@link AccumuloRowInputFormat#setConnectionInfo(JobConf, ConnectionInfo)}
* <li>{@link AccumuloRowInputFormat#setInputTableName(JobConf, String)}
* <li>{@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, Authorizations)}
- * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)}
* </ul>
*
* Other static methods are optional.
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index fb36282..0ca7c2d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
@@ -48,6 +49,7 @@ import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ConnectionInfoFactory;
import org.apache.accumulo.core.client.impl.Credentials;
import org.apache.accumulo.core.client.impl.DelegationTokenImpl;
import org.apache.accumulo.core.client.impl.OfflineScanner;
@@ -119,6 +121,20 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
}
/**
+ * Sets connection information needed to communicate with Accumulo for this job
+ *
+ * @param job
+ * Hadoop job instance to be configured
+ * @param info
+ * Connection information for Accumulo
+ * @since 2.0.0
+ */
+ public static void setConnectionInfo(Job job, ConnectionInfo info) throws AccumuloSecurityException {
+ setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken());
+ setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info));
+ }
+
+ /**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
@@ -134,7 +150,9 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
* @param token
* the user's password
* @since 1.5.0
+ * @deprecated since 2.0.0; use {@link #setConnectionInfo(Job, ConnectionInfo)} instead.
*/
+ @Deprecated
public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
if (token instanceof KerberosToken) {
log.info("Received KerberosToken, attempting to fetch DelegationToken");
@@ -253,7 +271,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
- * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(Job, ClientConfiguration)} instead.
+ * @deprecated since 1.6.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead.
*/
@Deprecated
public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
@@ -269,7 +287,9 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
* @param clientConfig
* client configuration containing connection options
* @since 1.6.0
+ * @deprecated since 2.0.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead.
*/
+ @Deprecated
public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) {
InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
index 837b3fe..441ac33 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
@@ -19,8 +19,7 @@ package org.apache.accumulo.core.client.mapreduce;
import java.io.IOException;
import java.util.Map.Entry;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -39,9 +38,9 @@ import org.apache.log4j.Level;
* The user must specify the following via static configurator methods:
*
* <ul>
- * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
+ * <li>{@link AccumuloInputFormat#setConnectionInfo(Job, ConnectionInfo)}
+ * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)}
* <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
- * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, ClientConfiguration)}
* </ul>
*
* Other static methods are optional.
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index e821b5d..af69e5a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -37,6 +38,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
+import org.apache.accumulo.core.client.impl.ConnectionInfoFactory;
import org.apache.accumulo.core.client.impl.DelegationTokenImpl;
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
@@ -69,9 +71,7 @@ import org.apache.log4j.Logger;
* The user must specify the following via static configurator methods:
*
* <ul>
- * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
- * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, String)}
- * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, ClientConfiguration)}
+ * <li>{@link AccumuloOutputFormat#setConnectionInfo(Job, ConnectionInfo)}
* </ul>
*
* Other static methods are optional.
@@ -82,6 +82,20 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
protected static final Logger log = Logger.getLogger(CLASS);
/**
+ * Set the connection information needed to communicate with Accumulo in this job.
+ *
+ * @param job
+ * Hadoop job to be configured
+ * @param info
+ * Accumulo connection information
+ * @since 2.0.0
+ */
+ public static void setConnectionInfo(Job job, ConnectionInfo info) throws AccumuloSecurityException {
+ setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken());
+ setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info));
+ }
+
+ /**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
@@ -97,7 +111,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
* @param token
* the user's password
* @since 1.5.0
+ * @deprecated since 2.0.0, replaced by {@link #setConnectionInfo(Job, ConnectionInfo)}
*/
+ @Deprecated
public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
if (token instanceof KerberosToken) {
log.info("Received KerberosToken, attempting to fetch DelegationToken");
@@ -138,7 +154,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
* @param tokenFile
* the path to the token file
* @since 1.6.0
+ * @deprecated since 2.0.0, replaced by {@link #setConnectionInfo(Job, ConnectionInfo)}
*/
+ @Deprecated
public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException {
OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
}
@@ -232,7 +250,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
* @param clientConfig
* client configuration for specifying connection timeouts, SSL connection options, etc.
* @since 1.6.0
+ * @deprecated since 2.0.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead.
*/
+ @Deprecated
public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) {
OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
index 043f88a..4f6b2bd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
@@ -19,9 +19,8 @@ package org.apache.accumulo.core.client.mapreduce;
import java.io.IOException;
import java.util.Map.Entry;
-import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -40,10 +39,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* The user must specify the following via static configurator methods:
*
* <ul>
- * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
+ * <li>{@link AccumuloRowInputFormat#setConnectionInfo(Job, ConnectionInfo)}
* <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)}
* <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)}
- * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, ClientConfiguration)}
* </ul>
*
* Other static methods are optional.
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
index 9b5601b..4918125 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
@@ -85,6 +85,11 @@ public class MockConnector extends Connector {
config.getMaxWriteThreads());
}
+ @Override
+ public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException {
+ return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig());
+ }
+
@Deprecated
@Override
public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException {
@@ -98,6 +103,11 @@ public class MockConnector extends Connector {
return createBatchWriter(tableName, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads());
}
+ @Override
+ public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException {
+ return createBatchWriter(tableName, new BatchWriterConfig());
+ }
+
@Deprecated
@Override
public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) {
@@ -110,6 +120,11 @@ public class MockConnector extends Connector {
}
@Override
+ public MultiTableBatchWriter createMultiTableBatchWriter() {
+ return createMultiTableBatchWriter(new BatchWriterConfig());
+ }
+
+ @Override
public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException {
MockTable table = acu.tables.get(tableName);
if (table == null)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java
index 5ac6f02..b39afe2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.conf.Configuration;
public class CredentialProviderToken extends PasswordToken {
public static final String NAME_PROPERTY = "name", CREDENTIAL_PROVIDERS_PROPERTY = "credentialProviders";
+ private String name;
+ private String credentialProviders;
+
public CredentialProviderToken() {
super();
}
@@ -40,11 +43,12 @@ public class CredentialProviderToken extends PasswordToken {
public CredentialProviderToken(String name, String credentialProviders) throws IOException {
requireNonNull(name);
requireNonNull(credentialProviders);
-
setWithCredentialProviders(name, credentialProviders);
}
protected void setWithCredentialProviders(String name, String credentialProviders) throws IOException {
+ this.name = name;
+ this.credentialProviders = credentialProviders;
final Configuration conf = new Configuration(CachedConfiguration.getInstance());
conf.set(CredentialProviderFactoryShim.CREDENTIAL_PROVIDER_PATH, credentialProviders);
@@ -57,6 +61,20 @@ public class CredentialProviderToken extends PasswordToken {
setPassword(CharBuffer.wrap(password));
}
+ /**
+ * @return Name used to extract Accumulo user password from CredentialProvider
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @return CSV list of CredentialProvider(s)
+ */
+ public String getCredentialProviders() {
+ return credentialProviders;
+ }
+
@Override
public void init(Properties properties) {
char[] nameCharArray = properties.get(NAME_PROPERTY), credentialProvidersCharArray = properties.get(CREDENTIAL_PROVIDERS_PROPERTY);
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
new file mode 100644
index 0000000..8218823
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.conf;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Generates client-properties.md for documentation on Accumulo website and accumulo-client.properties for Accumulo distribution tarball
+ */
+class ClientConfigGenerate {
+
+ private abstract class Format {
+
+ abstract void beginSection(String section);
+
+ abstract void pageHeader();
+
+ abstract void property(ClientProperty prop);
+
+ void generate() {
+ pageHeader();
+
+ generateSection("Instance", "instance.");
+ generateSection("Authentication", "auth.", "auth.method", "auth.username");
+ generateSection("Batch Writer", "batch.writer.");
+ generateSection("SSL", "ssl.");
+ generateSection("SASL", "sasl.");
+ generateSection("Tracing", "trace.");
+
+ doc.close();
+ }
+
+ void generateSection(String section, String prefix, String... prefixProps) {
+ beginSection(section);
+ for (String prop : prefixProps) {
+ ClientProperty cp = sortedProps.get(prop);
+ if (cp != null) {
+ property(cp);
+ }
+ }
+ Set<String> prefixSet = Sets.newHashSet(prefixProps);
+ for (ClientProperty prop : sortedProps.values()) {
+ if (prop.getKey().startsWith(prefix) && !prefixSet.contains(prop.getKey())) {
+ property(prop);
+ }
+ }
+ }
+
+ void generateSection(String section, String prefix) {
+ generateSection(section, prefix, "");
+ }
+ }
+
+ private class Markdown extends Format {
+
+ @Override
+ void beginSection(String section) {}
+
+ @Override
+ void pageHeader() {
+ doc.println("---");
+ doc.println("title: Client Properties");
+ doc.println("category: development");
+ doc.println("order: 9");
+ doc.println("---\n");
+ doc.println("<!-- WARNING: Do not edit this file. It is a generated file that is copied from Accumulo build (from core/target/generated-docs) -->");
+ doc.println("<!-- Generated by : " + getClass().getName() + " -->\n");
+ doc.println("Below are properties set in `accumulo-client.properties` that configure Accumulo clients. All properties have been part of the API since 2.0.0 (unless otherwise specified):\n");
+ doc.println("| Property | Default value | Since | Description |");
+ doc.println("|----------|---------------|-------|-------------|");
+ }
+
+ @Override
+ void property(ClientProperty prop) {
+ Objects.nonNull(prop);
+ doc.print("| <a name=\"" + prop.getKey().replace(".", "_") + "\" class=\"prop\"></a> " + prop.getKey() + " | ");
+ String defaultValue = sanitize(prop.getDefaultValue()).trim();
+ if (defaultValue.length() == 0) {
+ defaultValue = "*empty*";
+ }
+ doc.println(defaultValue + " | " + prop.getSince() + " | " + sanitize(prop.getDescription() + " |"));
+ }
+
+ String sanitize(String str) {
+ return str.replace("\n", "<br>");
+ }
+ }
+
+ private class ConfigFile extends Format {
+
+ @Override
+ void beginSection(String section) {
+ doc.println("\n## " + section + " properties");
+ doc.println("## --------------");
+ }
+
+ @Override
+ void pageHeader() {
+ doc.println("# Licensed to the Apache Software Foundation (ASF) under one or more");
+ doc.println("# contributor license agreements. See the NOTICE file distributed with");
+ doc.println("# this work for additional information regarding copyright ownership.");
+ doc.println("# The ASF licenses this file to You under the Apache License, Version 2.0");
+ doc.println("# (the \"License\"); you may not use this file except in compliance with");
+ doc.println("# the License. You may obtain a copy of the License at");
+ doc.println("#");
+ doc.println("# http://www.apache.org/licenses/LICENSE-2.0");
+ doc.println("#");
+ doc.println("# Unless required by applicable law or agreed to in writing, software");
+ doc.println("# distributed under the License is distributed on an \"AS IS\" BASIS,");
+ doc.println("# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.");
+ doc.println("# See the License for the specific language governing permissions and");
+ doc.println("# limitations under the License.\n");
+ doc.println("################################");
+ doc.println("## Accumulo client configuration");
+ doc.println("################################\n");
+ doc.println("## NOTE - All properties that have a default are set with it. Properties that");
+ doc.println("## are uncommented must be set by the user.");
+ }
+
+ @Override
+ void property(ClientProperty prop) {
+ doc.println("## " + prop.getDescription());
+ if (!prop.isRequired()) {
+ doc.print("#");
+ }
+ doc.println(prop.getKey() + "=" + prop.getDefaultValue() + "\n");
+ }
+ }
+
+ private PrintStream doc;
+ private final TreeMap<String,ClientProperty> sortedProps = new TreeMap<>();
+
+ private ClientConfigGenerate(PrintStream doc) {
+ Objects.nonNull(doc);
+ this.doc = doc;
+ for (ClientProperty prop : ClientProperty.values()) {
+ this.sortedProps.put(prop.getKey(), prop);
+ }
+ }
+
+ private void generateMarkdown() {
+ new Markdown().generate();
+ }
+
+ private void generateConfigFile() {
+ new ConfigFile().generate();
+ }
+
+ /**
+ * Generates markdown and config files for Accumulo client properties. Arguments are: "--generate-markdown filename" or "--generate-config filename"
+ *
+ * @param args
+ * command-line arguments
+ * @throws IllegalArgumentException
+ * if args is invalid
+ */
+ public static void main(String[] args) throws FileNotFoundException, UnsupportedEncodingException {
+ if (args.length == 2) {
+ ClientConfigGenerate clientConfigGenerate = new ClientConfigGenerate(new PrintStream(args[1], UTF_8.name()));
+ if (args[0].equals("--generate-markdown")) {
+ clientConfigGenerate.generateMarkdown();
+ return;
+ } else if (args[0].equals("--generate-config")) {
+ clientConfigGenerate.generateConfigFile();
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Usage: " + ClientConfigGenerate.class.getName() + " [--generate-markdown|--generate-config] <filename>");
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
new file mode 100644
index 0000000..b645b10
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.conf;
+
+import java.util.Objects;
+import java.util.Properties;
+
+import org.apache.accumulo.core.Constants;
+
+public enum ClientProperty {
+
+ // Instance
+ INSTANCE_NAME("instance.name", "", "Name of Accumulo instance to connect to", "", true),
+ INSTANCE_ZOOKEEPERS("instance.zookeepers", "localhost:2181", "Zookeeper connection information for Accumulo instance", "", true),
+ INSTANCE_ZOOKEEPERS_TIMEOUT_SEC("instance.zookeepers.timeout.sec", "30", "Zookeeper session timeout (in seconds)"),
+
+ // Authentication
+ AUTH_METHOD("auth.method", "password", "Authentication method (i.e password, kerberos, provider). Set more properties for chosen method below.", "", true),
+ AUTH_USERNAME("auth.username", "", "Accumulo username/principal for chosen authentication method", "", true),
+ AUTH_PASSWORD("auth.password", "", "Accumulo user password", "", true),
+ AUTH_KERBEROS_KEYTAB_PATH("auth.kerberos.keytab.path", "", "Path to Kerberos keytab"),
+ AUTH_PROVIDER_NAME("auth.provider.name", "", "Alias used to extract Accumulo user password from CredentialProvider"),
+ AUTH_PROVIDER_URLS("auth.provider.urls", "", "Comma separated list of URLs defining CredentialProvider(s)"),
+
+ // BatchWriter
+ BATCH_WRITER_MAX_MEMORY_BYTES("batch.writer.max.memory.bytes", "52428800", "Max memory (in bytes) to batch before writing"),
+ BATCH_WRITER_MAX_LATENCY_SEC("batch.writer.max.latency.sec", "120", "Max amount of time (in seconds) to hold data in memory before flushing it"),
+ BATCH_WRITER_MAX_TIMEOUT_SEC("batch.writer.max.timeout.sec", "0",
+ "Max amount of time (in seconds) an unresponsive server will be re-tried. An exception is thrown when this timeout is exceeded. Set to zero for no timeout."),
+ BATCH_WRITER_MAX_WRITE_THREADS("batch.writer.max.write.threads", "3", "Maximum number of threads to use for writing data to tablet servers."),
+ BATCH_WRITER_DURABILITY("batch.writer.durability", "default",
+ "Change the durability for the BatchWriter session. To use the table's durability setting. use \"default\" which is the table's durability setting."),
+
+ // SSL
+ SSL_ENABLED("ssl.enabled", "false", "Enable SSL for client RPC"),
+ SSL_KEYSTORE_PASSWORD("ssl.keystore.password", "", "Password used to encrypt keystore"),
+ SSL_KEYSTORE_PATH("ssl.keystore.path", "", "Path to SSL keystore file"),
+ SSL_KEYSTORE_TYPE("ssl.keystore.type", "jks", "Type of SSL keystore"),
+ SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password", "", "Password used to encrypt truststore"),
+ SSL_TRUSTSTORE_PATH("ssl.truststore.path", "", "Path to SSL truststore file"),
+ SSL_TRUSTSTORE_TYPE("ssl.truststore.type", "jks", "Type of SSL truststore"),
+ SSL_USE_JSSE("ssl.use.jsse", "false", "Use JSSE system properties to configure SSL"),
+
+ // SASL
+ SASL_ENABLED("sasl.enabled", "false", "Enable SASL for client RPC"),
+ SASL_QOP("sasl.qop", "auth", "SASL quality of protection. Valid values are 'auth', 'auth-int', and 'auth-conf'"),
+ SASL_KERBEROS_SERVER_PRIMARY("sasl.kerberos.server.primary", "accumulo", "Kerberos principal/primary that Accumulo servers use to login"),
+
+ // Trace
+ TRACE_SPAN_RECEIVERS("trace.span.receivers", "org.apache.accumulo.tracer.ZooTraceClient", "A list of span receiver classes to send trace spans"),
+ TRACE_ZOOKEEPER_PATH("trace.zookeeper.path", Constants.ZTRACERS, "The zookeeper node where tracers are registered");
+
+ private String key;
+ private String defaultValue;
+ private String description;
+ private String since;
+ private boolean required;
+
+ ClientProperty(String key, String defaultValue, String description, String since, boolean required) {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(defaultValue);
+ Objects.requireNonNull(description);
+ Objects.requireNonNull(since);
+ this.key = key;
+ this.defaultValue = defaultValue;
+ this.description = description;
+ this.since = since;
+ this.required = required;
+ }
+
+ ClientProperty(String key, String defaultValue, String description, String since) {
+ this(key, defaultValue, description, since, false);
+ }
+
+ ClientProperty(String key, String defaultValue, String description) {
+ this(key, defaultValue, description, "");
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public String getSince() {
+ return since;
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ public String getValue(Properties properties) {
+ Objects.requireNonNull(properties);
+ String value = properties.getProperty(getKey());
+ if (value == null || value.isEmpty()) {
+ value = getDefaultValue();
+ }
+ Objects.requireNonNull(value);
+ if (isRequired() && value.isEmpty()) {
+ throw new IllegalArgumentException(getKey() + " must be set!");
+ }
+ return value;
+ }
+
+ public Long getLong(Properties properties) {
+ String value = getValue(properties);
+ if (value.isEmpty()) {
+ return null;
+ }
+ return Long.parseLong(value);
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java
index 61245f8..91df2dc 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java
@@ -80,11 +80,12 @@ class ConfigurationDocGen {
@Override
void pageHeader() {
doc.println("---");
- doc.println("title: Configuration Properties");
+ doc.println("title: Server Properties");
doc.println("category: administration");
doc.println("order: 3");
doc.println("---\n");
doc.println("<!-- WARNING: Do not edit this file. It is a generated file that is copied from Accumulo build (from core/target/generated-docs) -->\n");
+ doc.println("Below are properties set in `accumulo-site.xml` or the Accumulo shell that configure Accumulo servers (i.e tablet server, master, etc):\n");
}
@Override
diff --git a/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java b/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java
index 9a32c26..786a7bd 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java
@@ -27,6 +27,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -201,6 +202,24 @@ public class BatchWriterConfigTest {
assertEquals(cfg1.hashCode(), cfg2.hashCode());
}
+ @Test
+ public void testMerge() {
+ BatchWriterConfig cfg1 = new BatchWriterConfig(), cfg2 = new BatchWriterConfig();
+ cfg1.setMaxMemory(1234);
+ cfg2.setMaxMemory(5858);
+ cfg2.setDurability(Durability.LOG);
+ cfg2.setMaxLatency(456, TimeUnit.MILLISECONDS);
+
+ Assert.assertEquals(Durability.DEFAULT, cfg1.getDurability());
+
+ BatchWriterConfig merged = cfg1.merge(cfg2);
+
+ Assert.assertEquals(1234, merged.getMaxMemory());
+ Assert.assertEquals(Durability.LOG, merged.getDurability());
+ Assert.assertEquals(456, merged.getMaxLatency(TimeUnit.MILLISECONDS));
+ Assert.assertEquals(3, merged.getMaxWriteThreads());
+ }
+
private byte[] createBytes(BatchWriterConfig bwConfig) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
bwConfig.write(new DataOutputStream(baos));
diff --git a/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java b/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java
index d6c7025..eb562d9 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java
@@ -56,6 +56,8 @@ public class CredentialProviderTokenTest {
}
CredentialProviderToken token = new CredentialProviderToken("root.password", keystorePath);
+ Assert.assertEquals("root.password", token.getName());
+ Assert.assertEquals(keystorePath, token.getCredentialProviders());
Assert.assertArrayEquals("password".getBytes(UTF_8), token.getPassword());
token = new CredentialProviderToken("bob.password", keystorePath);
diff --git a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
index e278fce..fdb8cfa 100644
--- a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
+++ b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.cluster.ClusterUser;
import org.apache.accumulo.cluster.ClusterUsers;
import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster;
import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
@@ -251,6 +252,11 @@ public abstract class AccumuloClusterHarness extends AccumuloITBase implements M
return clusterConf.getAdminPrincipal();
}
+ public static ConnectionInfo getConnectionInfo() {
+ return Connector.builder().forInstance(getCluster().getInstanceName(), getCluster().getZooKeepers()).usingToken(getAdminPrincipal(), getAdminToken())
+ .info();
+ }
+
public static AuthenticationToken getAdminToken() {
checkState(initialized);
return clusterConf.getAdminToken();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
index 999c1e9..c243f76 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
@@ -172,6 +173,10 @@ public class ConfigurableMacBase extends AccumuloITBase {
return getCluster().getConnector("root", new PasswordToken(ROOT_PASSWORD));
}
+ protected ConnectionInfo getConnectionInfo() {
+ return Connector.builder().forInstance(getCluster().getInstanceName(), getCluster().getZooKeepers()).usingPassword("root", ROOT_PASSWORD).info();
+ }
+
protected Process exec(Class<?> clazz, String... args) throws IOException {
return getCluster().exec(clazz, args);
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java
new file mode 100644
index 0000000..5846ec0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ConnectionInfo;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectorIT extends AccumuloClusterHarness {
+
+ @Test
+ public void testConnectorBuilder() throws Exception {
+ Connector c = getConnector();
+ String instanceName = c.getInstance().getInstanceName();
+ String zookeepers = c.getInstance().getZooKeepers();
+ final String user = "testuser";
+ final String password = "testpassword";
+ c.securityOperations().createLocalUser(user, new PasswordToken(password));
+
+ Connector conn = Connector.builder().forInstance(instanceName, zookeepers).usingPassword(user, password).build();
+
+ Assert.assertEquals(instanceName, conn.getInstance().getInstanceName());
+ Assert.assertEquals(zookeepers, conn.getInstance().getZooKeepers());
+ Assert.assertEquals(user, conn.whoami());
+
+ ConnectionInfo info = Connector.builder().forInstance(instanceName, zookeepers).usingPassword(user, password).info();
+ Assert.assertEquals(instanceName, info.getInstanceName());
+ Assert.assertEquals(zookeepers, info.getZookeepers());
+ Assert.assertEquals(user, info.getPrincipal());
+ Assert.assertTrue(info.getAuthenticationToken() instanceof PasswordToken);
+
+ Properties props = new Properties();
+ props.put(ClientProperty.INSTANCE_NAME.getKey(), instanceName);
+ props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers);
+ props.put(ClientProperty.AUTH_USERNAME.getKey(), user);
+ props.put(ClientProperty.AUTH_PASSWORD.getKey(), password);
+ conn = Connector.builder().usingProperties(props).build();
+
+ Assert.assertEquals(instanceName, conn.getInstance().getInstanceName());
+ Assert.assertEquals(zookeepers, conn.getInstance().getZooKeepers());
+ Assert.assertEquals(user, conn.whoami());
+ }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
index cf002dd..c3047ed 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
@@ -122,9 +122,8 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
job.setInputFormat(AccumuloInputFormat.class);
- AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+ AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo());
AccumuloInputFormat.setInputTableName(job, table);
- AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
AccumuloInputFormat.setBatchScan(job, batchScan);
if (sample) {
AccumuloInputFormat.setSamplerConfiguration(job, SAMPLER_CONFIG);
@@ -215,10 +214,9 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
Connector connector = getConnector();
connector.tableOperations().create(table);
- AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+ AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo());
AccumuloInputFormat.setInputTableName(job, table);
AccumuloInputFormat.setScanAuthorizations(job, auths);
- AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
AccumuloInputFormat.setScanIsolation(job, isolated);
AccumuloInputFormat.setLocalIterators(job, localIters);
AccumuloInputFormat.fetchColumns(job, fetchColumns);
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java
index eb12f1c..a2f3918 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java
@@ -31,13 +31,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -83,9 +82,8 @@ public class AccumuloOutputFormatIT extends ConfigurableMacBase {
// set the max memory so that we ensure we don't flush on the write.
batchConfig.setMaxMemory(Long.MAX_VALUE);
AccumuloOutputFormat outputFormat = new AccumuloOutputFormat();
+ AccumuloOutputFormat.setConnectionInfo(job, getConnectionInfo());
AccumuloOutputFormat.setBatchWriterOptions(job, batchConfig);
- AccumuloOutputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
- AccumuloOutputFormat.setConnectorInfo(job, "root", new PasswordToken(ROOT_PASSWORD));
RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null);
try {
@@ -122,7 +120,7 @@ public class AccumuloOutputFormatIT extends ConfigurableMacBase {
OutputCollector<Text,Mutation> finalOutput;
@Override
- public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException {
+ public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) {
finalOutput = output;
try {
if (key != null)
@@ -167,11 +165,10 @@ public class AccumuloOutputFormatIT extends ConfigurableMacBase {
job.setInputFormat(AccumuloInputFormat.class);
- ClientConfiguration clientConfig = ClientConfiguration.create().withInstance(instanceName).withZkHosts(zooKeepers);
+ ConnectionInfo info = Connector.builder().forInstance(instanceName, zooKeepers).usingPassword(user, pass).info();
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+ AccumuloInputFormat.setConnectionInfo(job, info);
AccumuloInputFormat.setInputTableName(job, table1);
- AccumuloInputFormat.setZooKeeperInstance(job, clientConfig);
job.setMapperClass(TestMapper.class);
job.setMapOutputKeyClass(Key.class);
@@ -180,10 +177,9 @@ public class AccumuloOutputFormatIT extends ConfigurableMacBase {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
- AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+ AccumuloOutputFormat.setConnectionInfo(job, info);
AccumuloOutputFormat.setCreateTables(job, false);
AccumuloOutputFormat.setDefaultTableName(job, table2);
- AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig);
job.setNumReduceTasks(0);
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
index 7d44833..2dad20e 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
@@ -298,8 +298,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
job.setInputFormatClass(inputFormatClass);
- AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
- AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+ AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo());
AccumuloInputFormat.setInputTableName(job, table);
AccumuloInputFormat.setBatchScan(job, batchScan);
if (sample) {
@@ -409,9 +408,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
Connector connector = getConnector();
connector.tableOperations().create(table);
- AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
- AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
-
+ AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo());
AccumuloInputFormat.setInputTableName(job, table);
AccumuloInputFormat.setScanAuthorizations(job, auths);
AccumuloInputFormat.setScanIsolation(job, isolated);
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java
index ff57722..31dd458 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java
@@ -94,9 +94,8 @@ public class AccumuloOutputFormatIT extends AccumuloClusterHarness {
job.setInputFormatClass(AccumuloInputFormat.class);
- AccumuloInputFormat.setConnectorInfo(job, user, pass);
+ AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo());
AccumuloInputFormat.setInputTableName(job, table1);
- AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
job.setMapperClass(TestMapper.class);
job.setMapOutputKeyClass(Key.class);
@@ -105,10 +104,9 @@ public class AccumuloOutputFormatIT extends AccumuloClusterHarness {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
- AccumuloOutputFormat.setConnectorInfo(job, user, pass);
+ AccumuloOutputFormat.setConnectionInfo(job, getConnectionInfo());
AccumuloOutputFormat.setCreateTables(job, false);
AccumuloOutputFormat.setDefaultTableName(job, table2);
- AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
job.setNumReduceTasks(0);
--
To stop receiving notification emails like this one, please contact
mwalch@apache.org.