You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/11/19 20:58:16 UTC
[3/3] git commit: ACCUMULO-1009
ACCUMULO-1009
Signed-off-by: Eric Newton <er...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7038755b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7038755b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7038755b
Branch: refs/heads/master
Commit: 7038755be153e11ca5ea7278d96746d72b24ea05
Parents: b732722
Author: Michael Berman <mb...@sqrrl.com>
Authored: Tue Nov 19 14:22:10 2013 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Tue Nov 19 14:58:31 2013 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/core/Constants.java | 2 +
.../apache/accumulo/core/cli/ClientOpts.java | 52 ++-
.../apache/accumulo/core/client/Instance.java | 2 +
.../accumulo/core/client/ZooKeeperInstance.java | 72 +++--
.../core/client/impl/ConditionalWriterImpl.java | 2 +-
.../accumulo/core/client/impl/MasterClient.java | 2 +-
.../accumulo/core/client/impl/ServerClient.java | 3 +-
.../impl/TabletServerBatchReaderIterator.java | 2 +-
.../client/impl/TabletServerBatchWriter.java | 2 +-
.../core/client/impl/ThriftTransportKey.java | 20 +-
.../core/client/impl/ThriftTransportPool.java | 25 +-
.../core/client/mapred/AbstractInputFormat.java | 16 +
.../core/client/mapred/AccumuloInputFormat.java | 4 +-
.../mapred/AccumuloMultiTableInputFormat.java | 3 +-
.../client/mapred/AccumuloOutputFormat.java | 23 +-
.../client/mapred/AccumuloRowInputFormat.java | 3 +-
.../client/mapreduce/AbstractInputFormat.java | 19 +-
.../client/mapreduce/AccumuloInputFormat.java | 3 +-
.../client/mapreduce/AccumuloOutputFormat.java | 24 +-
.../mapreduce/AccumuloRowInputFormat.java | 3 +-
.../mapreduce/lib/util/ConfiguratorBase.java | 49 ++-
.../accumulo/core/client/mock/MockInstance.java | 1 +
.../core/conf/AccumuloConfiguration.java | 16 +
.../accumulo/core/conf/ClientConfiguration.java | 310 ++++++++++++++++++
.../org/apache/accumulo/core/conf/Property.java | 19 ++
.../apache/accumulo/core/conf/PropertyType.java | 3 +-
.../accumulo/core/security/Credentials.java | 2 +-
.../accumulo/core/security/SecurityUtil.java | 4 +-
.../accumulo/core/util/SslConnectionParams.java | 205 ++++++++++++
.../apache/accumulo/core/util/ThriftUtil.java | 129 +++++---
.../apache/accumulo/core/util/shell/Shell.java | 25 +-
.../core/util/shell/ShellOptionsJC.java | 27 ++
.../core/client/impl/TabletLocatorImplTest.java | 1 +
.../lib/util/ConfiguratorBaseTest.java | 45 ++-
.../core/conf/ClientConfigurationTest.java | 65 ++++
.../core/util/shell/ShellSetInstanceTest.java | 56 ++--
.../examples/simple/filedata/FileDataQuery.java | 3 +-
.../simple/mapreduce/TokenFileWordCount.java | 3 +-
.../minicluster/MiniAccumuloCluster.java | 48 ++-
.../minicluster/MiniAccumuloConfig.java | 29 +-
.../minicluster/MiniAccumuloInstance.java | 21 +-
.../minicluster/MiniAccumuloClusterGCTest.java | 3 +-
.../minicluster/MiniAccumuloClusterTest.java | 10 +-
.../org/apache/accumulo/proxy/ProxyServer.java | 3 +-
.../server/cli/ClientOnDefaultTable.java | 2 +-
.../server/cli/ClientOnRequiredTable.java | 2 +-
.../apache/accumulo/server/cli/ClientOpts.java | 2 +-
.../accumulo/server/client/BulkImporter.java | 2 +-
.../accumulo/server/client/HdfsZooInstance.java | 1 +
.../accumulo/server/util/TServerUtils.java | 158 +++++----
.../accumulo/utils/metanalysis/IndexMeta.java | 7 +-
test/pom.xml | 27 ++
.../apache/accumulo/test/IMMLGBenchmark.java | 3 +-
.../org/apache/accumulo/test/TestIngest.java | 3 -
.../metadata/MetadataBatchScanTest.java | 3 +-
.../test/performance/thrift/NullTserver.java | 3 +-
.../apache/accumulo/test/randomwalk/State.java | 3 +-
.../test/randomwalk/multitable/CopyTool.java | 7 +-
.../randomwalk/sequential/MapRedVerifyTool.java | 7 +-
.../accumulo/test/scalability/ScaleTest.java | 3 +-
.../test/MultiTableBatchWriterTest.java | 24 +-
.../org/apache/accumulo/test/ShellServerIT.java | 71 ++--
.../accumulo/test/functional/AbstractMacIT.java | 37 ++-
.../test/functional/AccumuloInputFormatIT.java | 5 +-
.../apache/accumulo/test/functional/BulkIT.java | 14 +-
.../accumulo/test/functional/ConcurrencyIT.java | 24 +-
.../test/functional/ConfigurableMacIT.java | 3 +-
.../accumulo/test/functional/MapReduceIT.java | 19 +-
.../accumulo/test/functional/ScannerIT.java | 32 +-
.../accumulo/test/functional/ShutdownIT.java | 7 +-
.../accumulo/test/functional/SimpleMacIT.java | 17 +
.../apache/accumulo/test/functional/SslIT.java | 62 ++++
.../test/functional/SslWithClientAuthIT.java | 71 ++++
.../apache/accumulo/test/util/CertUtils.java | 324 +++++++++++++++++++
.../accumulo/test/util/CertUtilsTest.java | 158 +++++++++
75 files changed, 2095 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 9db0c40..644775a 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -105,4 +105,6 @@ public class Constants {
public static final String EXPORT_FILE = "exportMetadata.zip";
public static final String EXPORT_INFO_FILE = "accumulo_export_info.txt";
+ // Variables that will be substituted with environment vars in PropertyType.PATH values
+ public static final String[] PATH_PROPERTY_ENV_VARS = new String[]{"$ACCUMULO_HOME", "$ACCUMULO_CONF_DIR"};
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
index 3013cec..1d26a00 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
@@ -35,6 +35,8 @@ import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ClientConfiguration;
+import org.apache.accumulo.core.conf.ClientConfiguration.ClientProperty;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.security.Authorizations;
@@ -160,6 +162,12 @@ public class ClientOpts extends Help {
@Parameter(names = "--site-file", description = "Read the given accumulo site file to find the accumulo instance")
public String siteFile = null;
+ @Parameter(names = "--ssl", description = "Connect to accumulo over SSL")
+ public boolean sslEnabled = false;
+
+ @Parameter(names = "--config-file", description = "Read the given client config file. If omitted, the path searched can be specified with $ACCUMULO_CLIENT_CONF_PATH, which defaults to ~/.accumulo/config:$ACCUMULO_CONF_DIR/client.conf:/etc/accumulo/client.conf")
+ public String clientConfigFile = null;
+
public void startDebugLogging() {
if (debug)
Logger.getLogger(Constants.CORE_PACKAGE_NAME).setLevel(Level.TRACE);
@@ -186,12 +194,39 @@ public class ClientOpts extends Help {
}
protected Instance cachedInstance = null;
+ protected ClientConfiguration cachedClientConfig = null;
synchronized public Instance getInstance() {
if (cachedInstance != null)
return cachedInstance;
if (mock)
return cachedInstance = new MockInstance(instance);
+ return cachedInstance = new ZooKeeperInstance(this.getClientConfiguration());
+ }
+
+ public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
+ if (this.principal == null || this.getToken() == null)
+ throw new AccumuloSecurityException("You must provide a user (-u) and password (-p)", SecurityErrorCode.BAD_CREDENTIALS);
+ return getInstance().getConnector(principal, getToken());
+ }
+
+ public void setAccumuloConfigs(Job job) throws AccumuloSecurityException {
+ AccumuloInputFormat.setZooKeeperInstance(job, this.getClientConfiguration());
+ AccumuloOutputFormat.setZooKeeperInstance(job, this.getClientConfiguration());
+ }
+
+ protected ClientConfiguration getClientConfiguration() throws IllegalArgumentException {
+ if (cachedClientConfig != null)
+ return cachedClientConfig;
+
+ ClientConfiguration clientConfig;
+ try {
+ clientConfig = ClientConfiguration.loadDefault(clientConfigFile);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ if (sslEnabled)
+ clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SSL_ENABLED, "true");
if (siteFile != null) {
AccumuloConfiguration config = new AccumuloConfiguration() {
Configuration xml = new Configuration();
@@ -220,20 +255,11 @@ public class ClientOpts extends Help {
this.zookeepers = config.get(Property.INSTANCE_ZK_HOST);
Path instanceDir = new Path(config.get(Property.INSTANCE_DFS_DIR), "instance_id");
String instanceIDFromFile = ZooUtil.getInstanceIDFromHdfs(instanceDir);
- return cachedInstance = new ZooKeeperInstance(UUID.fromString(instanceIDFromFile), zookeepers);
+ if (config.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED))
+ clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SSL_ENABLED, "true");
+ return cachedClientConfig = clientConfig.withInstance(UUID.fromString(instanceIDFromFile)).withZkHosts(zookeepers);
}
- return cachedInstance = new ZooKeeperInstance(this.instance, this.zookeepers);
- }
-
- public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
- if (this.principal == null || this.getToken() == null)
- throw new AccumuloSecurityException("You must provide a user (-u) and password (-p)", SecurityErrorCode.BAD_CREDENTIALS);
- return getInstance().getConnector(principal, getToken());
- }
-
- public void setAccumuloConfigs(Job job) throws AccumuloSecurityException {
- AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers);
- AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers);
+ return cachedClientConfig = clientConfig.withInstance(instance).withZkHosts(zookeepers);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Instance.java b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
index 27d502f..f8a7682 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@ -140,7 +140,9 @@ public interface Instance {
*
* @param conf
* accumulo configuration
+ * @deprecated since 1.6.0
*/
+ @Deprecated
public abstract void setConfiguration(AccumuloConfiguration conf);
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index 07cc0a3..fb4ab79 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@ -27,7 +27,8 @@ import org.apache.accumulo.core.client.impl.ConnectorImpl;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.ClientConfiguration;
+import org.apache.accumulo.core.conf.ClientConfiguration.ClientProperty;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.util.ArgumentChecker;
@@ -37,6 +38,7 @@ import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -68,6 +70,9 @@ public class ZooKeeperInstance implements Instance {
private final int zooKeepersSessionTimeOut;
+ private AccumuloConfiguration accumuloConf;
+ private ClientConfiguration clientConf;
+
private volatile boolean closed = false;
/**
@@ -76,10 +81,11 @@ public class ZooKeeperInstance implements Instance {
* The name of specific accumulo instance. This is set at initialization time.
* @param zooKeepers
* A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
+ * @deprecated since 1.6.0; Use {@link #ZooKeeperInstance(ClientConfiguration)} instead.
*/
-
+ @Deprecated
public ZooKeeperInstance(String instanceName, String zooKeepers) {
- this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+ this(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zooKeepers));
}
/**
@@ -90,16 +96,11 @@ public class ZooKeeperInstance implements Instance {
* A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
* @param sessionTimeout
* zoo keeper session time out in milliseconds.
+ * @deprecated since 1.6.0; Use {@link #ZooKeeperInstance(ClientConfiguration)} instead.
*/
-
+ @Deprecated
public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) {
- ArgumentChecker.notNull(instanceName, zooKeepers);
- this.instanceName = instanceName;
- this.zooKeepers = zooKeepers;
- this.zooKeepersSessionTimeOut = sessionTimeout;
- zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
- getInstanceID();
- clientInstances.incrementAndGet();
+ this(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zooKeepers).withZkTimeout(sessionTimeout));
}
/**
@@ -108,10 +109,11 @@ public class ZooKeeperInstance implements Instance {
* The UUID that identifies the accumulo instance you want to connect to.
* @param zooKeepers
* A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
+ * @deprecated since 1.6.0; Use {@link #ZooKeeperInstance(ClientConfiguration)} instead.
*/
-
+ @Deprecated
public ZooKeeperInstance(UUID instanceId, String zooKeepers) {
- this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+ this(ClientConfiguration.loadDefault().withInstance(instanceId).withZkHosts(zooKeepers));
}
/**
@@ -122,14 +124,34 @@ public class ZooKeeperInstance implements Instance {
* A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
* @param sessionTimeout
* zoo keeper session time out in milliseconds.
+ * @deprecated since 1.6.0; Use {@link #ZooKeeperInstance(ClientConfiguration)} instead.
*/
-
+ @Deprecated
public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) {
- ArgumentChecker.notNull(instanceId, zooKeepers);
- this.instanceId = instanceId.toString();
- this.zooKeepers = zooKeepers;
- this.zooKeepersSessionTimeOut = sessionTimeout;
- zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
+ this(ClientConfiguration.loadDefault().withInstance(instanceId).withZkHosts(zooKeepers).withZkTimeout(sessionTimeout));
+ }
+
+ /**
+ * @param config
+ * Client configuration for specifying connection options.
+ * See {@link ClientConfiguration} which extends Configuration with convenience methods specific to Accumulo.
+ * @since 1.6.0
+ */
+
+ public ZooKeeperInstance(Configuration config) {
+ ArgumentChecker.notNull(config);
+ if (config instanceof ClientConfiguration) {
+ this.clientConf = (ClientConfiguration)config;
+ } else {
+ this.clientConf = new ClientConfiguration(config);
+ }
+ this.instanceId = clientConf.get(ClientProperty.INSTANCE_ID);
+ this.instanceName = clientConf.get(ClientProperty.INSTANCE_NAME);
+ if ((instanceId == null) == (instanceName == null))
+ throw new IllegalArgumentException("Expected exactly one of instanceName and instanceId to be set");
+ this.zooKeepers = clientConf.get(ClientProperty.INSTANCE_ZK_HOST);
+ this.zooKeepersSessionTimeOut = (int) AccumuloConfiguration.getTimeInMillis(clientConf.get(ClientProperty.INSTANCE_ZK_TIMEOUT));
+ zooCache = ZooCache.getInstance(zooKeepers, zooKeepersSessionTimeOut);
clientInstances.incrementAndGet();
}
@@ -241,18 +263,18 @@ public class ZooKeeperInstance implements Instance {
}
}
- private AccumuloConfiguration conf = null;
-
@Override
public AccumuloConfiguration getConfiguration() {
- if (conf == null)
- conf = AccumuloConfiguration.getDefaultConfiguration();
- return conf;
+ if (accumuloConf == null) {
+ accumuloConf = clientConf.getAccumuloConfiguration();
+ }
+ return accumuloConf;
}
@Override
+ @Deprecated
public void setConfiguration(AccumuloConfiguration conf) {
- this.conf = conf;
+ this.accumuloConf = conf;
}
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 6b2a1cf..bfbac86 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -534,7 +534,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
private TabletClientService.Iface getClient(String location) throws TTransportException {
TabletClientService.Iface client;
if (timeout < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
- client = ThriftUtil.getTServerClient(location, timeout);
+ client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeout);
else
client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
return client;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index 32c80f9..dd28fca 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@ -61,7 +61,7 @@ public class MasterClient {
try {
// Master requests can take a long time: don't ever time out
- MasterClientService.Client client = ThriftUtil.getClientNoTimeout(new MasterClientService.Client.Factory(), master);
+ MasterClientService.Client client = ThriftUtil.getClientNoTimeout(new MasterClientService.Client.Factory(), master, instance.getConfiguration());
return client;
} catch (TTransportException tte) {
if (tte.getCause().getClass().equals(UnknownHostException.class)) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
index 218bd36..90db5ee 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.ArgumentChecker;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.SslConnectionParams;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -141,7 +142,7 @@ public class ServerClient {
if (data != null && !new String(data).equals("master"))
servers.add(new ThriftTransportKey(
new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT),
- rpcTimeout));
+ rpcTimeout, SslConnectionParams.forClient(instance.getConfiguration())));
}
boolean opened = false;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index 0376304..7718207 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -633,7 +633,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
try {
TabletClientService.Client client;
if (timeoutTracker.getTimeOut() < conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
- client = ThriftUtil.getTServerClient(server, timeoutTracker.getTimeOut());
+ client = ThriftUtil.getTServerClient(server, conf, timeoutTracker.getTimeOut());
else
client = ThriftUtil.getTServerClient(server, conf);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 0dd86bf..e2c2802 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -856,7 +856,7 @@ public class TabletServerBatchWriter {
TabletClientService.Iface client;
if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
- client = ThriftUtil.getTServerClient(location, timeoutTracker.getTimeOut());
+ client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut());
else
client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
index f07139d..2816da7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -17,16 +17,17 @@
package org.apache.accumulo.core.client.impl;
import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.SslConnectionParams;
class ThriftTransportKey {
private final String location;
private final int port;
private final long timeout;
+ private final SslConnectionParams sslParams;
private int hash = -1;
- ThriftTransportKey(String location, long timeout) {
-
+ ThriftTransportKey(String location, long timeout, SslConnectionParams sslParams) {
ArgumentChecker.notNull(location);
String[] locationAndPort = location.split(":", 2);
if (locationAndPort.length == 2) {
@@ -36,6 +37,7 @@ class ThriftTransportKey {
throw new IllegalArgumentException("Location was expected to contain port but did not. location=" + location);
this.timeout = timeout;
+ this.sslParams = sslParams;
}
String getLocation() {
@@ -50,23 +52,31 @@ class ThriftTransportKey {
return timeout;
}
+ public boolean isSsl() {
+ return sslParams != null;
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ThriftTransportKey))
return false;
ThriftTransportKey ttk = (ThriftTransportKey) o;
- return location.equals(ttk.location) && port == ttk.port && timeout == ttk.timeout;
+ return location.equals(ttk.location) && port == ttk.port && timeout == ttk.timeout && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams)));
}
@Override
public int hashCode() {
if (hash == -1)
- hash = (location + Integer.toString(port) + Long.toString(timeout)).hashCode();
+ hash = toString().hashCode();
return hash;
}
@Override
public String toString() {
- return location + ":" + Integer.toString(port) + " (" + Long.toString(timeout) + ")";
+ return (isSsl()?"ssl:":"") + location + ":" + Integer.toString(port) + " (" + Long.toString(timeout) + ")";
+ }
+
+ public SslConnectionParams getSslParams() {
+ return sslParams;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index e7dabb5..765a4fc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.core.client.impl;
-import java.io.IOException;
import java.security.SecurityPermission;
import java.util.ArrayList;
import java.util.Collections;
@@ -35,10 +34,9 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.TTimeoutTransport;
+import org.apache.accumulo.core.util.SslConnectionParams;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.log4j.Logger;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -362,11 +360,11 @@ public class ThriftTransportPool {
private ThriftTransportPool() {}
public TTransport getTransportWithDefaultTimeout(HostAndPort addr, AccumuloConfiguration conf) throws TTransportException {
- return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+ return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), SslConnectionParams.forClient(conf));
}
- public TTransport getTransport(String location, long milliseconds) throws TTransportException {
- return getTransport(new ThriftTransportKey(location, milliseconds));
+ public TTransport getTransport(String location, long milliseconds, SslConnectionParams sslParams) throws TTransportException {
+ return getTransport(new ThriftTransportKey(location, milliseconds, sslParams));
}
private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
@@ -456,19 +454,8 @@ public class ThriftTransportPool {
}
private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
- TTransport transport;
- if (cacheKey.getTimeout() == 0) {
- transport = new TSocket(cacheKey.getLocation(), cacheKey.getPort());
- } else {
- try {
- transport = TTimeoutTransport.create(HostAndPort.fromParts(cacheKey.getLocation(), cacheKey.getPort()), cacheKey.getTimeout());
- } catch (IOException ex) {
- throw new TTransportException(ex);
- }
- }
- transport = ThriftUtil.transportFactory().getTransport(transport);
- transport.open();
-
+ TTransport transport = ThriftUtil.createClientTransport(HostAndPort.fromParts(cacheKey.getLocation(), cacheKey.getPort()), (int)cacheKey.getTimeout(), cacheKey.getSslParams());
+
if (log.isTraceEnabled())
log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index 856936e..53ac4a1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Range;
@@ -179,12 +180,27 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
+ * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead.
*/
+ @Deprecated
public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
InputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers);
}
/**
+ * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param clientConfig
+ * client configuration containing connection options
+ * @since 1.6.0
+ */
+ public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) {
+ InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig);
+ }
+
+ /**
* Configures a {@link org.apache.accumulo.core.client.mock.MockInstance} for this job.
*
* @param job
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
index cccd7b8..ffd74a5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -40,7 +41,8 @@ import org.apache.hadoop.mapred.Reporter;
* <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
* <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)}
* <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)}
- * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)}
+ * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR
+ * {@link AccumuloInputFormat#setMockInstance(JobConf, String)}
* </ul>
*
* Other static methods are optional.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
index 61838db..f6eb294 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.format.DefaultFormatter;
@@ -39,7 +40,7 @@ import org.apache.hadoop.mapred.Reporter;
* <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, org.apache.accumulo.core.client.security.tokens.AuthenticationToken)}
* <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)}
* <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, org.apache.accumulo.core.security.Authorizations)}
- * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)}
+ * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)}
* <li>{@link AccumuloMultiTableInputFormat#setInputTableConfigs(org.apache.hadoop.mapred.JobConf, java.util.Map)}
* </ul>
*
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
index 908b8b3..6b418d6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.SecurityErrorCode;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
@@ -61,7 +62,7 @@ import org.apache.log4j.Logger;
* <ul>
* <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
* <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, String)}
- * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(JobConf, String)}
+ * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR {@link AccumuloOutputFormat#setMockInstance(JobConf, String)}
* </ul>
*
* Other static methods are optional.
@@ -182,12 +183,32 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
+ * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead.
*/
+
+ @Deprecated
public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
OutputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers);
}
/**
+ * Configures a {@link ZooKeeperInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param instanceName
+ * the Accumulo instance name
+ * @param zooKeepers
+ * a comma-separated list of zookeeper servers
+ * @param clientConfig
+ * client configuration for specifying connection timeouts, SSL connection options, etc.
+ * @since 1.6.0
+ */
+ public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) {
+ OutputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig);
+ }
+
+ /**
* Configures a {@link MockInstance} for this job.
*
* @param job
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
index fe5003b..9c6189b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
@@ -21,6 +21,7 @@ import java.util.Map.Entry;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -42,7 +43,7 @@ import org.apache.hadoop.mapred.Reporter;
* <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
* <li>{@link AccumuloRowInputFormat#setInputTableName(JobConf, String)}
* <li>{@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, Authorizations)}
- * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloRowInputFormat#setMockInstance(JobConf, String)}
+ * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, String, String, ClientConfiguration)} OR {@link AccumuloRowInputFormat#setMockInstance(JobConf, String)}
* </ul>
*
* Other static methods are optional.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 626a785..9d8024e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
@@ -189,12 +190,27 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
+ * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(Job, ClientConfiguration)} instead.
*/
+ @Deprecated
public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
}
/**
+ * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param clientConfig
+ * client configuration containing connection options
+ * @since 1.6.0
+ */
+ public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) {
+ InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig);
+ }
+
+ /**
* Configures a {@link org.apache.accumulo.core.client.mock.MockInstance} for this job.
*
* @param job
@@ -379,7 +395,6 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
// but the scanner will use the table id resolved at job setup time
InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName());
-
try {
log.debug("Creating connector with user: " + principal);
log.debug("Creating scanner for table: " + split.getTableName());
@@ -456,7 +471,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
return InputConfigurator.binOffline(tableId, ranges, instance, conn);
}
-
+
/**
* Gets the splits of the tables that have been set on the job.
*
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
index 9ecae53..0539c93 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -39,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* <ul>
* <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
* <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
- * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)}
+ * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)}
* </ul>
*
* Other static methods are optional.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 727bfec..6782b4b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.SecurityErrorCode;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
@@ -62,7 +63,7 @@ import org.apache.log4j.Logger;
* <ul>
* <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
* <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, String)}
- * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)}
+ * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, ClientConfiguration)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)}
* </ul>
*
* Other static methods are optional.
@@ -183,12 +184,31 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
+ * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(Job, ClientConfiguration)} instead.
*/
+ @Deprecated
public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
}
/**
+ * Configures a {@link ZooKeeperInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param instanceName
+ * the Accumulo instance name
+ * @param zooKeepers
+ * a comma-separated list of zookeeper servers
+ * @param clientConfig
+ * client configuration for specifying connection timeouts, SSL connection options, etc.
+ * @since 1.6.0
+ */
+ public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) {
+ OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig);
+ }
+
+ /**
* Configures a {@link MockInstance} for this job.
*
* @param job
@@ -208,7 +228,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
* the Hadoop context for the configured job
* @return an Accumulo instance
* @since 1.5.0
- * @see #setZooKeeperInstance(Job, String, String)
+ * @see #setZooKeeperInstance(Job, String, String, ClientConfiguration)
* @see #setMockInstance(Job, String)
*/
protected static Instance getInstance(JobContext context) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
index 992990d..a52b098 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
@@ -21,6 +21,7 @@ import java.util.Map.Entry;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
@@ -42,7 +43,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
* <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)}
* <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)}
- * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)}
+ * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, String, ClientConfiguration)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)}
* </ul>
*
* Other static methods are optional.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
index 4f8cdb6..c0fcc72 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.conf.ClientConfiguration;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.util.ArgumentChecker;
import org.apache.commons.codec.binary.Base64;
@@ -72,7 +73,7 @@ public class ConfiguratorBase {
* @since 1.5.0
*/
protected static enum InstanceOpts {
- TYPE, NAME, ZOO_KEEPERS;
+ TYPE, NAME, ZOO_KEEPERS, CLIENT_CONFIG;
}
/**
@@ -277,16 +278,38 @@ public class ConfiguratorBase {
* @param zooKeepers
* a comma-separated list of zookeeper servers
* @since 1.5.0
+ * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(Class, Configuration, ClientConfiguration)} instead.
*/
+
+ @Deprecated
public static void setZooKeeperInstance(Class<?> implementingClass, Configuration conf, String instanceName, String zooKeepers) {
+ ArgumentChecker.notNull(instanceName, zooKeepers);
+ setZooKeeperInstance(implementingClass, conf, new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers));
+ }
+
+ /**
+ * Configures a {@link ZooKeeperInstance} for this job.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @param instanceName
+ * the Accumulo instance name
+ * @param zooKeepers
+ * a comma-separated list of zookeeper servers
+ * @param clientConfig
+ * client configuration for specifying connection timeouts, SSL connection options, etc.
+ * @since 1.5.0
+ */
+ public static void setZooKeeperInstance(Class<?> implementingClass, Configuration conf, ClientConfiguration clientConfig) {
String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
if (!conf.get(key, "").isEmpty())
throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key));
conf.set(key, "ZooKeeperInstance");
-
- ArgumentChecker.notNull(instanceName, zooKeepers);
- conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName);
- conf.set(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS), zooKeepers);
+ if (clientConfig != null) {
+ conf.set(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG), clientConfig.serialize());
+ }
}
/**
@@ -319,17 +342,23 @@ public class ConfiguratorBase {
* the Hadoop configuration object to configure
* @return an Accumulo instance
* @since 1.5.0
- * @see #setZooKeeperInstance(Class, Configuration, String, String)
+ * @see #setZooKeeperInstance(Class, Configuration, String, String, ClientConfiguration)
* @see #setMockInstance(Class, Configuration, String)
*/
public static Instance getInstance(Class<?> implementingClass, Configuration conf) {
String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE), "");
if ("MockInstance".equals(instanceType))
return new MockInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)));
- else if ("ZooKeeperInstance".equals(instanceType))
- return new ZooKeeperInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)), conf.get(enumToConfKey(implementingClass,
- InstanceOpts.ZOO_KEEPERS)));
- else if (instanceType.isEmpty())
+ else if ("ZooKeeperInstance".equals(instanceType)) {
+ String clientConfigString = conf.get(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG));
+ if (clientConfigString == null) {
+ String instanceName = conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME));
+ String zookeepers = conf.get(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS));
+ return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers));
+ } else {
+ return new ZooKeeperInstance(ClientConfiguration.deserialize(clientConfigString));
+ }
+ } else if (instanceType.isEmpty())
throw new IllegalStateException("Instance has not been configured for " + implementingClass.getSimpleName());
else
throw new IllegalStateException("Unrecognized instance type " + instanceType);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
index d391464..5b7dcbf 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
@@ -145,6 +145,7 @@ public class MockInstance implements Instance {
}
@Override
+ @Deprecated
public void setConfiguration(AccumuloConfiguration conf) {
this.conf = conf;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
index da170e9..3aed8c1 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -180,6 +181,21 @@ public abstract class AccumuloConfiguration implements Iterable<Entry<String,Str
return Integer.parseInt(countString);
}
+ public String getPath(Property property) {
+ checkType(property, PropertyType.PATH);
+
+ String pathString = get(property);
+ if (pathString == null) return null;
+
+ for (String replaceableEnvVar : Constants.PATH_PROPERTY_ENV_VARS) {
+ String envValue = System.getenv(replaceableEnvVar);
+ if (envValue != null)
+ pathString = pathString.replace("$" + replaceableEnvVar, envValue);
+ }
+
+ return pathString;
+ }
+
public static synchronized DefaultConfiguration getDefaultConfiguration() {
return DefaultConfiguration.getInstance();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfiguration.java
new file mode 100644
index 0000000..5bb95ae
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfiguration.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.conf;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+/**
+ * Contains a list of property keys recognized by the Accumulo client and convenience methods for setting them.
+ */
+public class ClientConfiguration extends CompositeConfiguration {
+ public static final String USER_ACCUMULO_DIR_NAME = ".accumulo";
+ public static final String USER_CONF_FILENAME = "config";
+ public static final String GLOBAL_CONF_FILENAME = "client.conf";
+
+ public enum ClientProperty {
+ RPC_SSL_TRUSTSTORE_PATH(Property.RPC_SSL_TRUSTSTORE_PATH),
+ RPC_SSL_TRUSTSTORE_PASSWORD(Property.RPC_SSL_TRUSTSTORE_PASSWORD),
+ RPC_SSL_TRUSTSTORE_TYPE(Property.RPC_SSL_TRUSTSTORE_TYPE),
+ RPC_SSL_KEYSTORE_PATH(Property.RPC_SSL_KEYSTORE_PATH),
+ RPC_SSL_KEYSTORE_PASSWORD(Property.RPC_SSL_KEYSTORE_PASSWORD),
+ RPC_SSL_KEYSTORE_TYPE(Property.RPC_SSL_KEYSTORE_TYPE),
+ RPC_USE_JSSE(Property.RPC_USE_JSSE),
+ INSTANCE_RPC_SSL_CLIENT_AUTH(Property.INSTANCE_RPC_SSL_CLIENT_AUTH),
+ INSTANCE_RPC_SSL_ENABLED(Property.INSTANCE_RPC_SSL_ENABLED),
+ INSTANCE_ZK_HOST(Property.INSTANCE_ZK_HOST),
+ INSTANCE_ZK_TIMEOUT(Property.INSTANCE_ZK_TIMEOUT),
+ INSTANCE_NAME("client.instance.name", null, PropertyType.STRING, "Name of Accumulo instance to connect to"),
+ INSTANCE_ID("client.instance.id", null, PropertyType.STRING, "UUID of Accumulo instance to connect to"),
+ ;
+
+ private String key;
+ private String defaultValue;
+ private PropertyType type;
+ private String description;
+
+ private Property accumuloProperty = null;
+
+ private ClientProperty(Property prop) {
+ this(prop.getKey(), prop.getDefaultValue(), prop.getType(), prop.getDescription());
+ accumuloProperty = prop;
+ }
+
+ private ClientProperty(String key, String defaultValue, PropertyType type, String description) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+ this.type = type;
+ this.description = description;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public PropertyType getType() {
+ return type;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public Property getAccumuloProperty() {
+ return accumuloProperty;
+ }
+
+ public static ClientProperty getPropertyByKey(String key) {
+ for (ClientProperty prop : ClientProperty.values())
+ if (prop.getKey().equals(key))
+ return prop;
+ return null;
+ }
+ };
+
+ public ClientConfiguration(List<? extends Configuration> configs) {
+ super(configs);
+ }
+
+ public ClientConfiguration(Configuration... configs) {
+ this(Arrays.asList(configs));
+ }
+
+ public static ClientConfiguration loadDefault() {
+ return loadFromSearchPath(getDefaultSearchPath());
+ }
+
+ public static ClientConfiguration loadDefault(String overridePropertiesFilename) throws FileNotFoundException, ConfigurationException {
+ if (overridePropertiesFilename == null)
+ return loadDefault();
+ else
+ return new ClientConfiguration(new PropertiesConfiguration(overridePropertiesFilename));
+ }
+
+ private static ClientConfiguration loadFromSearchPath(List<String> paths) {
+ try {
+ List<Configuration> configs = new LinkedList<Configuration>();
+ for (String path : paths) {
+ File conf = new File(path);
+ if (conf.canRead()) {
+ configs.add(new PropertiesConfiguration(conf));
+ }
+ }
+ return new ClientConfiguration(configs);
+ } catch (ConfigurationException e) {
+ throw new IllegalStateException("Error loading client configuration", e);
+ }
+ }
+
+ public static ClientConfiguration deserialize(String serializedConfig) {
+ PropertiesConfiguration propConfig = new PropertiesConfiguration();
+ try {
+ propConfig.load(new StringReader(serializedConfig));
+ } catch (ConfigurationException e) {
+ throw new IllegalArgumentException("Error deserializing client configuration: " + serializedConfig, e);
+ }
+ return new ClientConfiguration(propConfig);
+ }
+
+ private static List<String> getDefaultSearchPath() {
+ String clientConfSearchPath = System.getenv("ACCUMULO_CLIENT_CONF_PATH");
+ List<String> clientConfPaths;
+ if (clientConfSearchPath != null) {
+ clientConfPaths = Arrays.asList(clientConfSearchPath.split(File.pathSeparator));
+ } else {
+ // if $ACCUMULO_CLIENT_CONF_PATH env isn't set, priority from top to bottom is:
+ // ~/.accumulo/config
+ // $ACCUMULO_CONF_DIR/client.conf -OR- $ACCUMULO_HOME/conf/client.conf (depending on whether $ACCUMULO_CONF_DIR is set)
+ // /etc/accumulo/client.conf
+ clientConfPaths = new LinkedList<String>();
+ clientConfPaths.add(System.getProperty("user.home") + File.separator + USER_ACCUMULO_DIR_NAME + File.separator + USER_CONF_FILENAME);
+ if (System.getenv("ACCUMULO_CONF_DIR") != null) {
+ clientConfPaths.add(System.getenv("ACCUMULO_CONF_DIR") + File.separator + GLOBAL_CONF_FILENAME);
+ } else if (System.getenv("ACCUMULO_HOME") != null) {
+ clientConfPaths.add(System.getenv("ACCUMULO_HOME") + File.separator + "conf" + File.separator + GLOBAL_CONF_FILENAME);
+ }
+ clientConfPaths.add("/etc/accumulo/" + GLOBAL_CONF_FILENAME);
+ }
+ return clientConfPaths;
+ }
+
+ public String serialize() {
+ PropertiesConfiguration propConfig = new PropertiesConfiguration();
+ propConfig.copy(this);
+ StringWriter writer = new StringWriter();
+ try {
+ propConfig.save(writer);
+ } catch (ConfigurationException e) {
+ // this should never happen
+ throw new IllegalStateException(e);
+ }
+ return writer.toString();
+ }
+
+ public String get(ClientProperty prop) {
+ if (this.containsKey(prop.getKey()))
+ return this.getString(prop.getKey());
+ else
+ return prop.getDefaultValue();
+ }
+
+ public void setProperty(ClientProperty prop, String value) {
+ this.setProperty(prop.getKey(), value);
+ }
+
+ public ClientConfiguration with(ClientProperty prop, String value) {
+ this.setProperty(prop.getKey(), value);
+ return this;
+ }
+
+ public ClientConfiguration withInstance(String instanceName) {
+ ArgumentChecker.notNull(instanceName);
+ return with(ClientProperty.INSTANCE_NAME, instanceName);
+ }
+
+ public ClientConfiguration withInstance(UUID instanceId) {
+ ArgumentChecker.notNull(instanceId);
+ return with(ClientProperty.INSTANCE_ID, instanceId.toString());
+ }
+
+ public ClientConfiguration withZkHosts(String zooKeepers) {
+ ArgumentChecker.notNull(zooKeepers);
+ return with(ClientProperty.INSTANCE_ZK_HOST, zooKeepers);
+ }
+
+ public ClientConfiguration withZkTimeout(int timeout) {
+ return with(ClientProperty.INSTANCE_ZK_TIMEOUT, String.valueOf(timeout));
+ }
+
+ public ClientConfiguration withSsl(boolean sslEnabled) {
+ return withSsl(sslEnabled, false);
+ }
+
+ public ClientConfiguration withSsl(boolean sslEnabled, boolean useJsseConfig) {
+ return with(ClientProperty.INSTANCE_RPC_SSL_ENABLED, String.valueOf(sslEnabled))
+ .with(ClientProperty.RPC_USE_JSSE, String.valueOf(useJsseConfig));
+ }
+
+ public ClientConfiguration withTruststore(String path) {
+ return withTruststore(path, null, null);
+ }
+
+ public ClientConfiguration withTruststore(String path, String password, String type) {
+ ArgumentChecker.notNull(path);
+ setProperty(ClientProperty.RPC_SSL_TRUSTSTORE_PATH, path);
+ if (password != null)
+ setProperty(ClientProperty.RPC_SSL_TRUSTSTORE_PASSWORD, password);
+ if (type != null)
+ setProperty(ClientProperty.RPC_SSL_TRUSTSTORE_TYPE, type);
+ return this;
+ }
+
+ public ClientConfiguration withKeystore(String path) {
+ return withKeystore(path, null, null);
+ }
+
+ public ClientConfiguration withKeystore(String path, String password, String type) {
+ ArgumentChecker.notNull(path);
+ setProperty(ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH, "true");
+ setProperty(ClientProperty.RPC_SSL_KEYSTORE_PATH, path);
+ if (password != null)
+ setProperty(ClientProperty.RPC_SSL_KEYSTORE_PASSWORD, password);
+ if (type != null)
+ setProperty(ClientProperty.RPC_SSL_KEYSTORE_TYPE, type);
+ return this;
+ }
+
+ public AccumuloConfiguration getAccumuloConfiguration() {
+ final AccumuloConfiguration defaultConf = AccumuloConfiguration.getDefaultConfiguration();
+ return new AccumuloConfiguration() {
+
+ @Override
+ public Iterator<Entry<String,String>> iterator() {
+ TreeMap<String,String> entries = new TreeMap<String,String>();
+
+ for (Entry<String,String> parentEntry : defaultConf)
+ entries.put(parentEntry.getKey(), parentEntry.getValue());
+
+ @SuppressWarnings("unchecked")
+ Iterator<String> keyIter = getKeys();
+ while (keyIter.hasNext()) {
+ String key = keyIter.next();
+ entries.put(key, getString(key));
+ }
+
+ return entries.entrySet().iterator();
+ }
+
+ @Override
+ public String get(Property property) {
+ if (containsKey(property.getKey()))
+ return getString(property.getKey());
+ else
+ return defaultConf.get(property);
+ }
+
+ @Override
+ public void getProperties(Map<String,String> props, PropertyFilter filter) {
+ for (Entry<String,String> entry : this)
+ if (filter.accept(entry.getKey()))
+ props.put(entry.getKey(), entry.getValue());
+ }
+ };
+ }
+
+ public static ClientConfiguration fromAccumuloConfiguration(AccumuloConfiguration accumuloConf) {
+ Map<String,String> props = new HashMap<String,String>();
+ for (ClientProperty prop : ClientProperty.values()) {
+ if (prop.accumuloProperty == null)
+ continue;
+ props.put(prop.getKey(), accumuloConf.get(prop.accumuloProperty));
+ }
+ return new ClientConfiguration(new MapConfiguration(props));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b915ee4..4a38d15 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -77,6 +77,23 @@ public enum Property {
+ "of that file. Sometimes, you change your strategy and want to use the new strategy, not the old one. (Most commonly, this will be "
+ "because you have moved key material from one spot to another.) If you want to override the recorded key strategy with the one in "
+ "the configuration file, set this property to true."),
+ // SSL properties local to each node (see also instance.ssl.enabled which must be consistent across all nodes in an instance)
+ RPC_PREFIX("rpc.", null, PropertyType.PREFIX, "Properties in this category related to the configuration of SSL keys for RPC. See also instance.ssl.enabled"),
+ RPC_SSL_KEYSTORE_PATH("rpc.javax.net.ssl.keyStore", "$ACCUMULO_CONF_DIR/ssl/keystore.jks", PropertyType.PATH,
+ "Path of the keystore file for the servers' private SSL key"),
+ @Sensitive
+ RPC_SSL_KEYSTORE_PASSWORD("rpc.javax.net.ssl.keyStorePassword", "", PropertyType.STRING,
+ "Password used to encrypt the SSL private keystore. Leave blank to use the Accumulo instance secret"),
+ RPC_SSL_KEYSTORE_TYPE("rpc.javax.net.ssl.keyStoreType", "jks", PropertyType.STRING,
+ "Type of SSL keystore"),
+ RPC_SSL_TRUSTSTORE_PATH("rpc.javax.net.ssl.trustStore", "$ACCUMULO_CONF_DIR/ssl/truststore.jks", PropertyType.PATH,
+ "Path of the truststore file for the root cert"),
+ @Sensitive
+ RPC_SSL_TRUSTSTORE_PASSWORD("rpc.javax.net.ssl.trustStorePassword", "", PropertyType.STRING,
+ "Password used to encrypt the SSL truststore. Leave blank to use no password"),
+ RPC_SSL_TRUSTSTORE_TYPE("rpc.javax.net.ssl.trustStoreType", "jks", PropertyType.STRING,
+ "Type of SSL truststore"),
+ RPC_USE_JSSE("rpc.useJsse", "false", PropertyType.BOOLEAN, "Use JSSE system properties to configure SSL rather than general.javax.net.ssl.* Accumulo properties"),
// instance properties (must be the same for every node in an instance)
INSTANCE_PREFIX("instance.", null, PropertyType.PREFIX,
"Properties in this category must be consistent throughout a cloud. This is enforced and servers won't be able to communicate if these differ."),
@@ -106,6 +123,8 @@ public enum Property {
"The authorizor class that accumulo will use to determine what labels a user has privilege to see"),
INSTANCE_SECURITY_PERMISSION_HANDLER("instance.security.permissionHandler", "org.apache.accumulo.server.security.handler.ZKPermHandler",
PropertyType.CLASSNAME, "The permission handler class that accumulo will use to determine if a user has privilege to perform an action"),
+ INSTANCE_RPC_SSL_ENABLED("instance.rpc.ssl.enabled", "false", PropertyType.BOOLEAN, "Use SSL for socket connections from clients and among accumulo services"),
+ INSTANCE_RPC_SSL_CLIENT_AUTH("instance.rpc.ssl.clientAuth", "false", PropertyType.BOOLEAN, "Require clients to present certs signed by a trusted root"),
// general properties
GENERAL_PREFIX("general.", null, PropertyType.PREFIX,
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index fc45442..688b186 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.conf;
import java.util.regex.Pattern;
+import org.apache.accumulo.core.Constants;
import org.apache.hadoop.fs.Path;
public enum PropertyType {
@@ -50,7 +51,7 @@ public enum PropertyType {
+ "Examples of invalid fractions/percentages are '', '10 percent', 'Hulk Hogan'"),
PATH("path", ".*",
- "A string that represents a filesystem path, which can be either relative or absolute to some directory. The filesystem depends on the property."),
+ "A string that represents a filesystem path, which can be either relative or absolute to some directory. The filesystem depends on the property. The following environment variables will be substituted: " + Constants.PATH_PROPERTY_ENV_VARS),
ABSOLUTEPATH("absolute path", null,
"An absolute filesystem path. The filesystem depends on the property. This is the same as path, but enforces that its root is explicitly specified.") {
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/security/Credentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/Credentials.java b/core/src/main/java/org/apache/accumulo/core/security/Credentials.java
index 0552e7e..45708a8 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/Credentials.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/Credentials.java
@@ -70,7 +70,7 @@ public class Credentials {
* Converts the current object to a serialized form. The object returned from this contains a non-destroyable version of the {@link AuthenticationToken}, so
* references to it should be tightly controlled.
*/
- public final String serialize() throws AccumuloSecurityException {
+ public final String serialize() {
return (getPrincipal() == null ? "-" : Base64.encodeBase64String(getPrincipal().getBytes(Constants.UTF8))) + ":"
+ (getToken() == null ? "-" : Base64.encodeBase64String(getToken().getClass().getName().getBytes(Constants.UTF8))) + ":"
+ (getToken() == null ? "-" : Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(getToken())));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/SecurityUtil.java b/core/src/main/java/org/apache/accumulo/core/security/SecurityUtil.java
index 8add1a7..4ffcc36 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/SecurityUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/SecurityUtil.java
@@ -37,13 +37,11 @@ public class SecurityUtil {
public static void serverLogin() {
@SuppressWarnings("deprecation")
AccumuloConfiguration acuConf = AccumuloConfiguration.getSiteConfiguration();
- String keyTab = acuConf.get(Property.GENERAL_KERBEROS_KEYTAB);
+ String keyTab = acuConf.getPath(Property.GENERAL_KERBEROS_KEYTAB);
if (keyTab == null || keyTab.length() == 0)
return;
usingKerberos = true;
- if (keyTab.contains("$ACCUMULO_HOME") && System.getenv("ACCUMULO_HOME") != null)
- keyTab = keyTab.replace("$ACCUMULO_HOME", System.getenv("ACCUMULO_HOME"));
String principalConfig = acuConf.get(Property.GENERAL_KERBEROS_PRINCIPAL);
if (principalConfig == null || principalConfig.length() == 0)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7038755b/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java b/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java
new file mode 100644
index 0000000..6fde38a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.URL;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
+
+public class SslConnectionParams {
+ private static final Logger log = Logger.getLogger(SslConnectionParams.class);
+
+ private boolean useJsse = false;
+ private boolean clientAuth = false;
+
+ private boolean keyStoreSet;
+ private String keyStorePath;
+ private String keyStorePass;
+ private String keyStoreType;
+
+ private boolean trustStoreSet;
+ private String trustStorePath;
+ private String trustStorePass;
+ private String trustStoreType;
+
+ public static SslConnectionParams forConfig(AccumuloConfiguration conf, boolean server) {
+ if (!conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED))
+ return null;
+
+ SslConnectionParams result = new SslConnectionParams();
+ boolean requireClientAuth = conf.getBoolean(Property.INSTANCE_RPC_SSL_CLIENT_AUTH);
+ if (server) {
+ result.setClientAuth(requireClientAuth);
+ }
+ if (conf.getBoolean(Property.RPC_USE_JSSE)) {
+ result.setUseJsse(true);
+ return result;
+ }
+
+ try {
+ if (!server || requireClientAuth) {
+ result.setTrustStoreFromConf(conf);
+ }
+ if (server || requireClientAuth) {
+ result.setKeyStoreFromConf(conf);
+ }
+ } catch (FileNotFoundException e) {
+ throw new IllegalArgumentException("Could not load configured keystore file", e);
+ }
+
+ return result;
+ }
+
+ private static String passwordFromConf(AccumuloConfiguration conf, String defaultPassword, Property passwordOverrideProperty) {
+ String keystorePassword = conf.get(passwordOverrideProperty);
+ if (!keystorePassword.isEmpty()) {
+ log.debug("Using explicit SSL private key password from " + passwordOverrideProperty.getKey());
+ } else {
+ keystorePassword = defaultPassword;
+ }
+ return keystorePassword;
+ }
+
+ private static String storePathFromConf(AccumuloConfiguration conf, Property pathProperty) throws FileNotFoundException {
+ return findKeystore(conf.getPath(pathProperty));
+ }
+
+ public void setKeyStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException {
+ keyStoreSet = true;
+ keyStorePath = storePathFromConf(conf, Property.RPC_SSL_KEYSTORE_PATH);
+ keyStorePass = passwordFromConf(conf, conf.get(Property.INSTANCE_SECRET), Property.RPC_SSL_KEYSTORE_PASSWORD);
+ keyStoreType = conf.get(Property.RPC_SSL_KEYSTORE_TYPE);
+ }
+
+ public void setTrustStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException {
+ trustStoreSet = true;
+ trustStorePath = storePathFromConf(conf, Property.RPC_SSL_TRUSTSTORE_PATH);
+ trustStorePass = passwordFromConf(conf, "", Property.RPC_SSL_TRUSTSTORE_PASSWORD);
+ trustStoreType = conf.get(Property.RPC_SSL_TRUSTSTORE_TYPE);
+ }
+
+ public static SslConnectionParams forServer(AccumuloConfiguration configuration) {
+ return forConfig(configuration, true);
+ }
+
+ public static SslConnectionParams forClient(AccumuloConfiguration configuration) {
+ return forConfig(configuration, false);
+ }
+
+ private static String findKeystore(String keystorePath) throws FileNotFoundException {
+ try {
+ // first just try the file
+ File file = new File(keystorePath);
+ if (file.exists())
+ return file.getAbsolutePath();
+ if (!file.isAbsolute()) {
+ // try classpath
+ URL url = SslConnectionParams.class.getClassLoader().getResource(keystorePath);
+ if (url != null) {
+ file = new File(url.toURI());
+ if (file.exists())
+ return file.getAbsolutePath();
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Exception finding keystore", e);
+ }
+ throw new FileNotFoundException("Failed to load SSL keystore from " + keystorePath);
+ }
+
+ public void setUseJsse(boolean useJsse) {
+ this.useJsse = useJsse;
+ }
+
+ public boolean useJsse() {
+ return useJsse;
+ }
+
+ public void setClientAuth(boolean clientAuth) {
+ this.clientAuth = clientAuth;
+ }
+
+ public boolean isClientAuth() {
+ return clientAuth;
+ }
+
+ public TSSLTransportParameters getTTransportParams() {
+ if (useJsse)
+ throw new IllegalStateException("Cannot get TTransportParams for JSEE configuration.");
+ TSSLTransportParameters params = new TSSLTransportParameters();
+ params.requireClientAuth(clientAuth);
+ if (keyStoreSet) {
+ params.setKeyStore(keyStorePath, keyStorePass, null, keyStoreType);
+ }
+ if (trustStoreSet) {
+ params.setTrustStore(trustStorePath, trustStorePass, null, trustStoreType);
+ }
+ return params;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 0;
+ hash = 31*hash + (clientAuth?0:1);
+ hash = 31*hash + (useJsse?0:1);
+ if (useJsse)
+ return hash;
+ hash = 31*hash + (keyStoreSet?0:1);
+ hash = 31*hash + (trustStoreSet?0:1);
+ if (keyStoreSet) {
+ hash = 31*hash + keyStorePath.hashCode();
+ }
+ if (trustStoreSet) {
+ hash = 31*hash + trustStorePath.hashCode();
+ }
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SslConnectionParams))
+ return false;
+
+ SslConnectionParams other = (SslConnectionParams)obj;
+ if (clientAuth != other.clientAuth)
+ return false;
+ if (useJsse)
+ return other.useJsse;
+ if (keyStoreSet) {
+ if (!other.keyStoreSet)
+ return false;
+ if (!keyStorePath.equals(other.keyStorePath) ||
+ !keyStorePass.equals(other.keyStorePass) ||
+ !keyStoreType.equals(other.keyStoreType))
+ return false;
+ }
+ if (trustStoreSet) {
+ if (!other.trustStoreSet)
+ return false;
+ if (!trustStorePath.equals(other.trustStorePath) ||
+ !trustStorePass.equals(other.trustStorePass) ||
+ !trustStoreType.equals(other.trustStoreType))
+ return false;
+ }
+ return true;
+ }
+}