You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2018/12/17 22:10:55 UTC
[accumulo] branch master updated: Merge ClientContext and
AccumuloClientImpl (#833)
This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 8c9a4e1 Merge ClientContext and AccumuloClientImpl (#833)
8c9a4e1 is described below
commit 8c9a4e147f4284af1d02948f8f44d6e3bce3804b
Author: Mike Walch <mw...@apache.org>
AuthorDate: Mon Dec 17 17:10:51 2018 -0500
Merge ClientContext and AccumuloClientImpl (#833)
* Removed AccumuloClientImpl and kept ClientContext
class which implements AccumuloClient
* Removed use of context.getClient() as ClientContext
implements AccumuloClient
* Limited instantiation of ClientContext by casting
AccumuloClient to ClientContext
---
.../org/apache/accumulo/core/client/Accumulo.java | 8 +-
.../org/apache/accumulo/core/client/Connector.java | 4 +-
.../core/client/MutationsRejectedException.java | 17 +-
.../accumulo/core/client/ZooKeeperInstance.java | 5 +-
.../core/client/mapred/AbstractInputFormat.java | 10 +-
.../core/client/mapred/AccumuloOutputFormat.java | 6 +-
.../core/client/mapreduce/AbstractInputFormat.java | 8 +-
.../client/mapreduce/AccumuloOutputFormat.java | 10 +-
.../core/clientImpl/AccumuloClientImpl.java | 461 ---------------------
.../accumulo/core/clientImpl/BulkImport.java | 2 +-
.../accumulo/core/clientImpl/ClientContext.java | 433 +++++++++++++++++--
.../accumulo/core/clientImpl/ConnectorImpl.java | 61 ++-
.../accumulo/core/clientImpl/OfflineIterator.java | 13 +-
.../core/clientImpl/ReplicationOperationsImpl.java | 12 +-
.../core/clientImpl/TableOperationsImpl.java | 4 +-
.../core/clientImpl/TabletServerBatchWriter.java | 4 +-
.../clientImpl/mapreduce/lib/ConfiguratorBase.java | 3 +-
.../mapreduce/lib/InputConfigurator.java | 2 +-
.../mapreduce/lib/MapReduceClientOpts.java | 32 +-
.../accumulo/core/metadata/MetadataServicer.java | 3 +-
.../core/metadata/TableMetadataServicer.java | 3 +-
.../core/metadata/schema/TabletsMetadata.java | 8 -
.../core/replication/ReplicationTable.java | 2 +-
.../org/apache/accumulo/core/summary/Gatherer.java | 2 +-
.../java/org/apache/accumulo/core/util/Merge.java | 2 +-
.../core/metadata/MetadataServicerTest.java | 7 +-
.../hadoopImpl/mapred/AbstractInputFormat.java | 11 +-
.../hadoopImpl/mapreduce/AbstractInputFormat.java | 9 +-
.../mapreduce/lib/InputConfigurator.java | 5 +-
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 7 +-
.../MiniAccumuloClusterExistingZooKeepersTest.java | 8 +-
.../org/apache/accumulo/server/ServerContext.java | 17 -
.../apache/accumulo/server/cli/ServerUtilOpts.java | 5 -
.../server/client/ClientServiceHandler.java | 3 +-
.../server/master/balancer/TableLoadBalancer.java | 2 +-
.../server/master/state/MetaDataStateStore.java | 2 +-
.../server/master/state/MetaDataTableScanner.java | 4 +-
.../accumulo/server/problems/ProblemReports.java | 9 +-
.../server/replication/ReplicaSystemHelper.java | 2 +-
.../server/replication/ReplicationUtil.java | 4 +-
.../org/apache/accumulo/server/util/Admin.java | 64 ++-
.../accumulo/server/util/ListVolumesUsed.java | 2 +-
.../accumulo/server/util/MetadataTableUtil.java | 12 +-
.../accumulo/server/util/RandomizeVolumes.java | 10 +-
.../server/util/RemoveEntriesForMissingFiles.java | 6 +-
.../accumulo/server/util/ReplicationTableUtil.java | 5 +-
.../accumulo/server/util/TableDiskUsage.java | 6 +-
.../server/util/VerifyTabletAssignments.java | 3 +-
.../server/security/SystemCredentialsTest.java | 6 +-
.../accumulo/gc/GarbageCollectWriteAheadLogs.java | 7 +-
.../apache/accumulo/gc/SimpleGarbageCollector.java | 4 +-
.../replication/CloseWriteAheadLogReferences.java | 12 +-
.../gc/GarbageCollectWriteAheadLogsTest.java | 31 +-
.../java/org/apache/accumulo/master/Master.java | 6 +-
.../master/MasterClientServiceHandler.java | 9 +-
.../apache/accumulo/master/TabletGroupWatcher.java | 16 +-
.../master/replication/ReplicationDriver.java | 12 +-
.../accumulo/master/replication/WorkDriver.java | 2 +-
.../tableOps/bulkVer1/CleanUpBulkImport.java | 2 +-
.../master/tableOps/bulkVer1/CopyFailed.java | 2 +-
.../tableOps/bulkVer2/CleanUpBulkImport.java | 2 +-
.../master/tableOps/bulkVer2/LoadFiles.java | 2 +-
.../master/tableOps/create/PopulateMetadata.java | 2 +-
.../accumulo/master/tableOps/delete/CleanUp.java | 4 +-
.../tableOps/tableExport/WriteExportFiles.java | 15 +-
.../tableImport/PopulateMetadataTable.java | 2 +-
.../java/org/apache/accumulo/monitor/Monitor.java | 4 +-
.../rest/replication/ReplicationResource.java | 2 +-
.../monitor/rest/trace/TracesResource.java | 179 ++++----
.../BatchWriterReplicationReplayer.java | 2 +-
.../tserver/replication/ReplicationProcessor.java | 10 +-
.../BatchWriterReplicationReplayerTest.java | 17 +-
.../main/java/org/apache/accumulo/shell/Shell.java | 4 +-
.../accumulo/shell/commands/CreateUserCommand.java | 4 +-
.../accumulo/harness/AccumuloClusterHarness.java | 9 +-
.../apache/accumulo/test/BatchWriterIterator.java | 2 +
.../accumulo/test/DetectDeadTabletServersIT.java | 2 +-
.../test/MasterRepairsDualAssignmentIT.java | 2 +-
.../java/org/apache/accumulo/test/SampleIT.java | 4 +-
.../apache/accumulo/test/TableOperationsIT.java | 6 +-
.../accumulo/test/TabletServerHdfsRestartIT.java | 6 +-
.../apache/accumulo/test/TransportCachingIT.java | 117 +++---
.../accumulo/test/UserCompactionStrategyIT.java | 4 +-
.../functional/BalanceAfterCommsFailureIT.java | 2 +-
.../BalanceInPresenceOfOfflineTableIT.java | 3 +-
.../test/functional/ConcurrentDeleteTableIT.java | 5 +-
.../test/functional/ConfigurableMacBase.java | 14 -
.../test/functional/DynamicThreadPoolsIT.java | 3 +-
.../accumulo/test/functional/FateStarvationIT.java | 3 +-
.../test/functional/HalfDeadTServerIT.java | 5 +-
.../test/functional/MasterAssignmentIT.java | 9 +-
.../test/functional/MetadataMaxFilesIT.java | 4 +-
.../accumulo/test/functional/MonitorSslIT.java | 22 +-
.../accumulo/test/functional/ReadWriteIT.java | 9 +-
.../apache/accumulo/test/functional/RenameIT.java | 3 +-
.../test/functional/SimpleBalancerFairnessIT.java | 3 +-
.../test/functional/TableChangeStateIT.java | 2 +-
.../functional/TabletStateChangeIteratorIT.java | 2 +-
.../accumulo/test/functional/WALSunnyDayIT.java | 2 +-
.../apache/accumulo/test/master/MergeStateIT.java | 5 +-
.../accumulo/test/master/SuspendedTabletsIT.java | 161 +++----
...GarbageCollectorCommunicatesWithTServersIT.java | 5 +-
.../replication/MultiTserverReplicationIT.java | 4 +-
.../accumulo/test/replication/ReplicationIT.java | 3 +-
.../replication/ReplicationOperationsImplIT.java | 4 +-
.../test/server/security/SystemCredentialsIT.java | 50 +--
106 files changed, 965 insertions(+), 1186 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Accumulo.java b/core/src/main/java/org/apache/accumulo/core/client/Accumulo.java
index 787de80..9ad5da6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Accumulo.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Accumulo.java
@@ -20,7 +20,7 @@ import java.util.Properties;
import org.apache.accumulo.core.client.lexicoder.Lexicoder;
import org.apache.accumulo.core.client.rfile.RFile;
-import org.apache.accumulo.core.clientImpl.AccumuloClientImpl;
+import org.apache.accumulo.core.clientImpl.ClientContext;
/**
* This class contains all API entry points created in 2.0.0 or later. The majority of the API is
@@ -65,8 +65,7 @@ public final class Accumulo {
* @return a builder object for Accumulo clients
*/
public static AccumuloClient.PropertyOptions<AccumuloClient> newClient() {
- return new AccumuloClientImpl.ClientBuilderImpl<>(
- AccumuloClientImpl.ClientBuilderImpl::buildClient);
+ return new ClientContext.ClientBuilderImpl<>(ClientContext.ClientBuilderImpl::buildClient);
}
/**
@@ -83,7 +82,6 @@ public final class Accumulo {
* @return a builder object for client Properties
*/
public static AccumuloClient.PropertyOptions<Properties> newClientProperties() {
- return new AccumuloClientImpl.ClientBuilderImpl<>(
- AccumuloClientImpl.ClientBuilderImpl::buildProps);
+ return new ClientContext.ClientBuilderImpl<>(ClientContext.ClientBuilderImpl::buildProps);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index d5b646e..cd4168e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -21,7 +21,7 @@ import org.apache.accumulo.core.client.admin.NamespaceOperations;
import org.apache.accumulo.core.client.admin.ReplicationOperations;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.clientImpl.AccumuloClientImpl;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ConnectorImpl;
import org.apache.accumulo.core.security.Authorizations;
@@ -280,6 +280,6 @@ public abstract class Connector {
*/
public static Connector from(AccumuloClient client)
throws AccumuloSecurityException, AccumuloException {
- return new ConnectorImpl((AccumuloClientImpl) client);
+ return new ConnectorImpl((ClientContext) client);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/MutationsRejectedException.java b/core/src/main/java/org/apache/accumulo/core/client/MutationsRejectedException.java
index 2e670bc..f6d1740 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/MutationsRejectedException.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/MutationsRejectedException.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Set;
import org.apache.accumulo.core.client.security.SecurityErrorCode;
@@ -56,7 +55,7 @@ public class MutationsRejectedException extends AccumuloException {
*
* @since 1.7.0
* @deprecated since 2.0.0, replaced by
- * {@link #MutationsRejectedException(Properties, List, Map, Collection, int, Throwable)}
+ * {@link #MutationsRejectedException(AccumuloClient, List, Map, Collection, int, Throwable)}
*/
@Deprecated
public MutationsRejectedException(Instance instance, List<ConstraintViolationSummary> cvsList,
@@ -75,8 +74,8 @@ public class MutationsRejectedException extends AccumuloException {
/**
* Creates Mutations rejected exception
*
- * @param clientProps
- * Client props
+ * @param client
+ * AccumuloClient
* @param cvsList
* list of constraint violations
* @param hashMap
@@ -88,12 +87,12 @@ public class MutationsRejectedException extends AccumuloException {
*
* @since 2.0.0
*/
- public MutationsRejectedException(Properties clientProps,
- List<ConstraintViolationSummary> cvsList, Map<TabletId,Set<SecurityErrorCode>> hashMap,
- Collection<String> serverSideErrors, int unknownErrors, Throwable cause) {
+ public MutationsRejectedException(AccumuloClient client, List<ConstraintViolationSummary> cvsList,
+ Map<TabletId,Set<SecurityErrorCode>> hashMap, Collection<String> serverSideErrors,
+ int unknownErrors, Throwable cause) {
super("# constraint violations : " + cvsList.size() + " security codes: "
- + format(hashMap, new ClientContext(clientProps)) + " # server errors "
- + serverSideErrors.size() + " # exceptions " + unknownErrors, cause);
+ + format(hashMap, (ClientContext) client) + " # server errors " + serverSideErrors.size()
+ + " # exceptions " + unknownErrors, cause);
this.cvsl = cvsList;
this.af = hashMap;
this.es = serverSideErrors;
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index d870be5..50d2800 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.clientImpl.AccumuloClientImpl;
import org.apache.accumulo.core.clientImpl.ClientConfConverter;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfoImpl;
@@ -38,7 +37,6 @@ import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonManager.Mode;
-import org.apache.accumulo.core.singletons.SingletonReservation;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
@@ -228,8 +226,7 @@ public class ZooKeeperInstance implements Instance {
Properties properties = ClientConfConverter.toProperties(clientConf);
properties.setProperty(ClientProperty.AUTH_PRINCIPAL.getKey(), principal);
properties.setProperty(ClientProperty.INSTANCE_NAME.getKey(), getInstanceName());
- return new ConnectorImpl(new AccumuloClientImpl(SingletonReservation.noop(),
- new ClientContext(new ClientInfoImpl(properties, token))));
+ return new ConnectorImpl(new ClientContext(new ClientInfoImpl(properties, token)));
}
@Override
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index ea53e59..22505b9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -180,9 +180,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
throws AccumuloSecurityException {
if (token instanceof KerberosToken) {
log.info("Received KerberosToken, attempting to fetch DelegationToken");
- try {
- AccumuloClient client = Accumulo.newClient().from(getClientProperties(job))
- .as(principal, token).build();
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProperties(job))
+ .as(principal, token).build()) {
token = client.securityOperations().getDelegationToken(new DelegationTokenConfig());
} catch (Exception e) {
log.warn("Failed to automatically obtain DelegationToken, Mappers/Reducers will likely"
@@ -482,7 +481,6 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
log.debug("Initializing input split: " + baseSplit);
ClientContext context = new ClientContext(getClientProperties(job));
- AccumuloClient client = context.getClient();
Authorizations authorizations = getScanAuthorizations(job);
String classLoaderContext = getClassLoaderContext(job);
String table = baseSplit.getTableName();
@@ -491,7 +489,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
// configuration, but the scanner will use the table id resolved at job setup time
InputTableConfig tableConfig = getInputTableConfig(job, baseSplit.getTableName());
- log.debug("Created client with user: " + client.whoami());
+ log.debug("Created client with user: " + context.whoami());
log.debug("Creating scanner for table: " + table);
log.debug("Authorizations are: " + authorizations);
@@ -503,7 +501,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
// Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit
// will not span tablets
int scanThreads = 1;
- scanner = client.createBatchScanner(baseSplit.getTableName(), authorizations,
+ scanner = context.createBatchScanner(baseSplit.getTableName(), authorizations,
scanThreads);
setupIterators(job, scanner, baseSplit.getTableName(), baseSplit);
if (null != classLoaderContext) {
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
index 796c8b3..478f851 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
@@ -553,6 +553,8 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
log.error("Constraint violations : " + e.getConstraintViolationSummaries().size());
}
throw new IOException(e);
+ } finally {
+ client.close();
}
}
}
@@ -561,9 +563,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
if (!isConnectorInfoSet(job))
throw new IOException("Connector info has not been set.");
- try {
- // if the instance isn't configured, it will complain here
- AccumuloClient c = Accumulo.newClient().from(getClientProperties(job)).build();
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProperties(job)).build()) {
String principal = getPrincipal(job);
AuthenticationToken token = getAuthenticationToken(job);
if (!c.securityOperations().authenticateUser(principal, token))
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index c5d09f9..9d79e57 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -183,9 +183,8 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
throws AccumuloSecurityException {
if (token instanceof KerberosToken) {
log.info("Received KerberosToken, attempting to fetch DelegationToken");
- try {
- AccumuloClient client = Accumulo.newClient().from(getClientProperties(job))
- .as(principal, token).build();
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProperties(job))
+ .as(principal, token).build()) {
token = client.securityOperations().getDelegationToken(new DelegationTokenConfig());
} catch (Exception e) {
log.warn("Failed to automatically obtain DelegationToken, "
@@ -491,7 +490,6 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
ClientInfo info = ClientInfo.from(getClientProperties(attempt));
ClientContext context = new ClientContext(info);
- AccumuloClient client = context.getClient();
Authorizations authorizations = getScanAuthorizations(attempt);
String classLoaderContext = getClassLoaderContext(attempt);
String table = split.getTableName();
@@ -513,7 +511,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
// Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit
// will not span tablets
int scanThreads = 1;
- scanner = client.createBatchScanner(split.getTableName(), authorizations, scanThreads);
+ scanner = context.createBatchScanner(split.getTableName(), authorizations, scanThreads);
setupIterators(attempt, scanner, split.getTableName(), split);
if (null != classLoaderContext) {
scanner.setClassLoaderContext(classLoaderContext);
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index cabcdad..8331b96 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -557,6 +557,8 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
log.error("Constraint violations : " + e.getConstraintViolationSummaries().size());
}
throw new IOException(e);
+ } finally {
+ client.close();
}
}
}
@@ -565,11 +567,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
public void checkOutputSpecs(JobContext job) throws IOException {
if (!isConnectorInfoSet(job))
throw new IOException("Connector info has not been set.");
- try {
- // if the instance isn't configured, it will complain here
- String principal = getPrincipal(job);
- AuthenticationToken token = getAuthenticationToken(job);
- AccumuloClient c = Accumulo.newClient().from(getClientProperties(job)).build();
+ String principal = getPrincipal(job);
+ AuthenticationToken token = getAuthenticationToken(job);
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProperties(job)).build()) {
if (!c.securityOperations().authenticateUser(principal, token))
throw new IOException("Unable to authenticate user");
} catch (AccumuloException | AccumuloSecurityException e) {
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloClientImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloClientImpl.java
deleted file mode 100644
index ca0c907..0000000
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloClientImpl.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.clientImpl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.nio.file.Path;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchDeleter;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.ConditionalWriterConfig;
-import org.apache.accumulo.core.client.MultiTableBatchWriter;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.TableOfflineException;
-import org.apache.accumulo.core.client.admin.InstanceOperations;
-import org.apache.accumulo.core.client.admin.NamespaceOperations;
-import org.apache.accumulo.core.client.admin.ReplicationOperations;
-import org.apache.accumulo.core.client.admin.SecurityOperations;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.conf.ClientProperty;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.singletons.SingletonManager;
-import org.apache.accumulo.core.singletons.SingletonReservation;
-
-public class AccumuloClientImpl implements AccumuloClient {
-
- final ClientContext context;
- private final String instanceID;
- private SecurityOperations secops = null;
- private TableOperationsImpl tableops = null;
- private NamespaceOperations namespaceops = null;
- private InstanceOperations instanceops = null;
- private ReplicationOperations replicationops = null;
- private final SingletonReservation singletonReservation;
- private volatile boolean closed = false;
-
- private void ensureOpen() {
- if (closed) {
- throw new IllegalStateException("This client was closed.");
- }
- }
-
- public AccumuloClientImpl(SingletonReservation reservation, final ClientContext context) {
- checkArgument(context != null, "Context is null");
- checkArgument(context.getCredentials() != null, "Credentials are null");
- checkArgument(context.getCredentials().getToken() != null, "Authentication token is null");
-
- this.singletonReservation = Objects.requireNonNull(reservation);
- this.context = context;
- instanceID = context.getInstanceID();
- this.tableops = new TableOperationsImpl(context);
- this.namespaceops = new NamespaceOperationsImpl(context, tableops);
- }
-
- Table.ID getTableId(String tableName) throws TableNotFoundException {
- Table.ID tableId = Tables.getTableId(context, tableName);
- if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
- throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
- return tableId;
- }
-
- @Override
- public BatchScanner createBatchScanner(String tableName, Authorizations authorizations,
- int numQueryThreads) throws TableNotFoundException {
- checkArgument(tableName != null, "tableName is null");
- checkArgument(authorizations != null, "authorizations is null");
- ensureOpen();
- return new TabletServerBatchReader(context, getTableId(tableName), authorizations,
- numQueryThreads);
- }
-
- @Override
- public BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
- throws TableNotFoundException {
- Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
- .getInteger(context.getClientInfo().getProperties());
- Objects.requireNonNull(numQueryThreads);
- ensureOpen();
- return createBatchScanner(tableName, authorizations, numQueryThreads);
- }
-
- @Override
- public BatchScanner createBatchScanner(String tableName)
- throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
- Authorizations auths = securityOperations().getUserAuthorizations(context.getPrincipal());
- return createBatchScanner(tableName, auths);
- }
-
- @Override
- public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
- int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException {
- checkArgument(tableName != null, "tableName is null");
- checkArgument(authorizations != null, "authorizations is null");
- ensureOpen();
- return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations,
- numQueryThreads, config.merge(context.getBatchWriterConfig()));
- }
-
- @Override
- public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
- int numQueryThreads) throws TableNotFoundException {
- ensureOpen();
- return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig());
- }
-
- @Override
- public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config)
- throws TableNotFoundException {
- checkArgument(tableName != null, "tableName is null");
- ensureOpen();
- // we used to allow null inputs for bw config
- if (config == null) {
- config = new BatchWriterConfig();
- }
- return new BatchWriterImpl(context, getTableId(tableName),
- config.merge(context.getBatchWriterConfig()));
- }
-
- @Override
- public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException {
- return createBatchWriter(tableName, new BatchWriterConfig());
- }
-
- @Override
- public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
- ensureOpen();
- return new MultiTableBatchWriterImpl(context, config.merge(context.getBatchWriterConfig()));
- }
-
- @Override
- public MultiTableBatchWriter createMultiTableBatchWriter() {
- return createMultiTableBatchWriter(new BatchWriterConfig());
- }
-
- @Override
- public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config)
- throws TableNotFoundException {
- ensureOpen();
- return new ConditionalWriterImpl(context, getTableId(tableName), config);
- }
-
- @Override
- public Scanner createScanner(String tableName, Authorizations authorizations)
- throws TableNotFoundException {
- checkArgument(tableName != null, "tableName is null");
- checkArgument(authorizations != null, "authorizations is null");
- ensureOpen();
- Scanner scanner = new ScannerImpl(context, getTableId(tableName), authorizations);
- Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE
- .getInteger(context.getClientInfo().getProperties());
- if (batchSize != null) {
- scanner.setBatchSize(batchSize);
- }
- return scanner;
- }
-
- @Override
- public Scanner createScanner(String tableName)
- throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
- Authorizations auths = securityOperations().getUserAuthorizations(context.getPrincipal());
- return createScanner(tableName, auths);
- }
-
- @Override
- public String whoami() {
- ensureOpen();
- return context.getCredentials().getPrincipal();
- }
-
- @Override
- public String getInstanceID() {
- ensureOpen();
- return instanceID;
- }
-
- @Override
- public synchronized TableOperations tableOperations() {
- ensureOpen();
- return tableops;
- }
-
- @Override
- public synchronized NamespaceOperations namespaceOperations() {
- ensureOpen();
- return namespaceops;
- }
-
- @Override
- public synchronized SecurityOperations securityOperations() {
- ensureOpen();
- if (secops == null)
- secops = new SecurityOperationsImpl(context);
-
- return secops;
- }
-
- @Override
- public synchronized InstanceOperations instanceOperations() {
- ensureOpen();
- if (instanceops == null)
- instanceops = new InstanceOperationsImpl(context);
-
- return instanceops;
- }
-
- @Override
- public synchronized ReplicationOperations replicationOperations() {
- ensureOpen();
- if (null == replicationops) {
- replicationops = new ReplicationOperationsImpl(context);
- }
-
- return replicationops;
- }
-
- @Override
- public Properties properties() {
- ensureOpen();
- Properties result = new Properties();
- this.context.getProperties().forEach((key, value) -> {
- if (!key.equals(ClientProperty.AUTH_TOKEN.getKey())) {
- result.setProperty((String) key, (String) value);
- }
- });
- return result;
- }
-
- public AuthenticationToken token() {
- ensureOpen();
- return this.context.getAuthenticationToken();
- }
-
- @Override
- public void close() {
- closed = true;
- try {
- context.close();
- } finally {
- singletonReservation.close();
- }
- }
-
- public static class ClientBuilderImpl<T>
- implements InstanceArgs<T>, PropertyOptions<T>, AuthenticationArgs<T>, ConnectionOptions<T>,
- SslOptions<T>, SaslOptions<T>, ClientFactory<T>, FromOptions<T> {
-
- private Properties properties = new Properties();
- private AuthenticationToken token = null;
- private Function<ClientBuilderImpl<T>,T> builderFunction;
-
- public ClientBuilderImpl(Function<ClientBuilderImpl<T>,T> builderFunction) {
- this.builderFunction = builderFunction;
- }
-
- private ClientInfo getClientInfo() {
- if (token != null) {
- ClientProperty.validate(properties, false);
- return new ClientInfoImpl(properties, token);
- }
- ClientProperty.validate(properties);
- return new ClientInfoImpl(properties);
- }
-
- @Override
- public T build() {
- return builderFunction.apply(this);
- }
-
- public static AccumuloClient buildClient(ClientBuilderImpl<AccumuloClient> cbi) {
- SingletonReservation reservation = SingletonManager.getClientReservation();
- try {
- // AccumuloClientImpl closes reservation unless a RuntimeException is thrown
- return new AccumuloClientImpl(reservation, new ClientContext(cbi.getClientInfo()));
- } catch (RuntimeException e) {
- reservation.close();
- throw e;
- }
- }
-
- public static Properties buildProps(ClientBuilderImpl<Properties> cbi) {
- ClientProperty.validate(cbi.properties);
- return cbi.properties;
- }
-
- @Override
- public AuthenticationArgs<T> to(CharSequence instanceName, CharSequence zookeepers) {
- setProperty(ClientProperty.INSTANCE_NAME, instanceName);
- setProperty(ClientProperty.INSTANCE_ZOOKEEPERS, zookeepers);
- return this;
- }
-
- @Override
- public SslOptions<T> truststore(CharSequence path) {
- setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
- return this;
- }
-
- @Override
- public SslOptions<T> truststore(CharSequence path, CharSequence password, CharSequence type) {
- setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
- setProperty(ClientProperty.SSL_TRUSTSTORE_PASSWORD, password);
- setProperty(ClientProperty.SSL_TRUSTSTORE_TYPE, type);
- return this;
- }
-
- @Override
- public SslOptions<T> keystore(CharSequence path) {
- setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
- return this;
- }
-
- @Override
- public SslOptions<T> keystore(CharSequence path, CharSequence password, CharSequence type) {
- setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
- setProperty(ClientProperty.SSL_KEYSTORE_PASSWORD, password);
- setProperty(ClientProperty.SSL_KEYSTORE_TYPE, type);
- return this;
- }
-
- @Override
- public SslOptions<T> useJsse() {
- setProperty(ClientProperty.SSL_USE_JSSE, "true");
- return this;
- }
-
- @Override
- public ConnectionOptions<T> zkTimeout(int timeout) {
- ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.setTimeInMillis(properties, (long) timeout);
- return this;
- }
-
- @Override
- public SslOptions<T> useSsl() {
- setProperty(ClientProperty.SSL_ENABLED, "true");
- return this;
- }
-
- @Override
- public SaslOptions<T> useSasl() {
- setProperty(ClientProperty.SASL_ENABLED, "true");
- return this;
- }
-
- @Override
- public ConnectionOptions<T> batchWriterConfig(BatchWriterConfig batchWriterConfig) {
- ClientProperty.BATCH_WRITER_MEMORY_MAX.setBytes(properties, batchWriterConfig.getMaxMemory());
- ClientProperty.BATCH_WRITER_LATENCY_MAX.setTimeInMillis(properties,
- batchWriterConfig.getMaxLatency(TimeUnit.MILLISECONDS));
- ClientProperty.BATCH_WRITER_TIMEOUT_MAX.setTimeInMillis(properties,
- batchWriterConfig.getTimeout(TimeUnit.MILLISECONDS));
- setProperty(ClientProperty.BATCH_WRITER_THREADS_MAX, batchWriterConfig.getMaxWriteThreads());
- setProperty(ClientProperty.BATCH_WRITER_DURABILITY,
- batchWriterConfig.getDurability().toString());
- return this;
- }
-
- @Override
- public ConnectionOptions<T> batchScannerQueryThreads(int numQueryThreads) {
- setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS, numQueryThreads);
- return this;
- }
-
- @Override
- public ConnectionOptions<T> scannerBatchSize(int batchSize) {
- setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
- return this;
- }
-
- @Override
- public SaslOptions<T> primary(CharSequence kerberosServerPrimary) {
- setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
- return this;
- }
-
- @Override
- public SaslOptions<T> qop(CharSequence qualityOfProtection) {
- setProperty(ClientProperty.SASL_QOP, qualityOfProtection);
- return this;
- }
-
- @Override
- public FromOptions<T> from(String propertiesFilePath) {
- return from(ClientInfoImpl.toProperties(propertiesFilePath));
- }
-
- @Override
- public FromOptions<T> from(Path propertiesFile) {
- return from(ClientInfoImpl.toProperties(propertiesFile));
- }
-
- @Override
- public FromOptions<T> from(Properties properties) {
- this.properties = properties;
- return this;
- }
-
- @Override
- public ConnectionOptions<T> as(CharSequence username, CharSequence password) {
- setProperty(ClientProperty.AUTH_PRINCIPAL, username);
- ClientProperty.setPassword(properties, password);
- return this;
- }
-
- @Override
- public ConnectionOptions<T> as(CharSequence principal, Path keyTabFile) {
- setProperty(ClientProperty.AUTH_PRINCIPAL, principal);
- ClientProperty.setKerberosKeytab(properties, keyTabFile.toString());
- return this;
- }
-
- @Override
- public ConnectionOptions<T> as(CharSequence principal, AuthenticationToken token) {
- if (token.isDestroyed()) {
- throw new IllegalArgumentException("AuthenticationToken has been destroyed");
- }
- setProperty(ClientProperty.AUTH_PRINCIPAL, principal.toString());
- ClientProperty.setAuthenticationToken(properties, token);
- this.token = token;
- return this;
- }
-
- public void setProperty(ClientProperty property, CharSequence value) {
- properties.setProperty(property.getKey(), value.toString());
- }
-
- public void setProperty(ClientProperty property, Long value) {
- setProperty(property, Long.toString(value));
- }
-
- public void setProperty(ClientProperty property, Integer value) {
- setProperty(property, Integer.toString(value));
- }
- }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
index 5652358..e3ca1c1 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
@@ -117,7 +117,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
Table.ID tableId = Tables.getTableId(context, tableName);
- Map<String,String> props = context.getClient().instanceOperations().getSystemConfiguration();
+ Map<String,String> props = context.instanceOperations().getSystemConfiguration();
AccumuloConfiguration conf = new ConfigurationCopy(props);
FileSystem fs = VolumeConfiguration.getVolume(dir, CachedConfiguration.getInstance(), conf)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 4312886..928a4d1 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -19,27 +19,46 @@ package org.apache.accumulo.core.clientImpl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.NamespaceOperations;
+import org.apache.accumulo.core.client.admin.ReplicationOperations;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonReservation;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -59,18 +78,17 @@ import com.google.common.base.Suppliers;
* to this object for later retrieval, rather than as a separate parameter. Any state in this object
* should be available at the time of its construction.
*/
-public class ClientContext {
+public class ClientContext implements AccumuloClient {
private static final Logger log = LoggerFactory.getLogger(ClientContext.class);
private ClientInfo info;
- private String instanceId = null;
+ private String instanceId;
private final ZooCache zooCache;
private Credentials creds;
private BatchWriterConfig batchWriterConfig;
private AccumuloConfiguration serverConf;
- protected AccumuloClient client;
// These fields are very frequently accessed (each time a connection is created) and expensive to
// compute, so cache them.
@@ -81,6 +99,13 @@ public class ClientContext {
private volatile boolean closed = false;
+ private SecurityOperations secops = null;
+ private TableOperationsImpl tableops = null;
+ private NamespaceOperations namespaceops = null;
+ private InstanceOperations instanceops = null;
+ private ReplicationOperations replicationops = null;
+ private SingletonReservation singletonReservation;
+
private void ensureOpen() {
if (closed) {
throw new IllegalStateException("This client was closed.");
@@ -92,19 +117,24 @@ public class ClientContext {
return () -> Suppliers.memoizeWithExpiration(s::get, 100, TimeUnit.MILLISECONDS).get();
}
- public ClientContext(AccumuloClient client) {
- this(ClientInfo.from(client.properties(), ((AccumuloClientImpl) client).token()));
- }
-
public ClientContext(Properties clientProperties) {
this(ClientInfo.from(clientProperties));
}
+ public ClientContext(SingletonReservation reservation, ClientInfo info) {
+ this(reservation, info, ClientConfConverter.toAccumuloConf(info.getProperties()));
+ }
+
public ClientContext(ClientInfo info) {
this(info, ClientConfConverter.toAccumuloConf(info.getProperties()));
}
public ClientContext(ClientInfo info, AccumuloConfiguration serverConf) {
+ this(SingletonReservation.noop(), info, serverConf);
+ }
+
+ public ClientContext(SingletonReservation reservation, ClientInfo info,
+ AccumuloConfiguration serverConf) {
this.info = info;
zooCache = new ZooCacheFactory().getZooCache(info.getZooKeepers(),
info.getZooKeepersSessionTimeOut());
@@ -114,6 +144,9 @@ public class ClientContext {
sslSupplier = memoizeWithExpiration(() -> SslConnectionParams.forClient(getConfiguration()));
saslSupplier = memoizeWithExpiration(
() -> SaslConnectionParams.from(getConfiguration(), getCredentials().getToken()));
+ this.singletonReservation = Objects.requireNonNull(reservation);
+ this.tableops = new TableOperationsImpl(this);
+ this.namespaceops = new NamespaceOperationsImpl(this, tableops);
}
/**
@@ -158,18 +191,11 @@ public class ClientContext {
@Override
public org.apache.accumulo.core.client.Connector getConnector(String principal,
AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
- AccumuloClient client = Accumulo.newClient().from(context.getProperties())
- .as(principal, token).build();
- return org.apache.accumulo.core.client.Connector.from(client);
+ return org.apache.accumulo.core.client.Connector.from(context);
}
};
}
- public ClientInfo getClientInfo() {
- ensureOpen();
- return info;
- }
-
/**
* Retrieve the credentials used to construct this context
*/
@@ -239,17 +265,6 @@ public class ClientContext {
return saslSupplier.get();
}
- /**
- * Retrieve an Accumulo client
- */
- public synchronized AccumuloClient getClient() {
- ensureOpen();
- if (client == null) {
- client = new AccumuloClientImpl(SingletonReservation.noop(), this);
- }
- return client;
- }
-
public BatchWriterConfig getBatchWriterConfig() {
ensureOpen();
if (batchWriterConfig == null) {
@@ -427,7 +442,371 @@ public class ClientContext {
return zooCache;
}
+ Table.ID getTableId(String tableName) throws TableNotFoundException {
+ Table.ID tableId = Tables.getTableId(this, tableName);
+ if (Tables.getTableState(this, tableId) == TableState.OFFLINE)
+ throw new TableOfflineException(Tables.getTableOfflineMsg(this, tableId));
+ return tableId;
+ }
+
+ @Override
+ public BatchScanner createBatchScanner(String tableName, Authorizations authorizations,
+ int numQueryThreads) throws TableNotFoundException {
+ checkArgument(tableName != null, "tableName is null");
+ checkArgument(authorizations != null, "authorizations is null");
+ ensureOpen();
+ return new TabletServerBatchReader(this, getTableId(tableName), authorizations,
+ numQueryThreads);
+ }
+
+ @Override
+ public BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
+ throws TableNotFoundException {
+ Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
+ .getInteger(getProperties());
+ Objects.requireNonNull(numQueryThreads);
+ ensureOpen();
+ return createBatchScanner(tableName, authorizations, numQueryThreads);
+ }
+
+ @Override
+ public BatchScanner createBatchScanner(String tableName)
+ throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
+ Authorizations auths = securityOperations().getUserAuthorizations(getPrincipal());
+ return createBatchScanner(tableName, auths);
+ }
+
+ @Override
+ public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
+ int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException {
+ checkArgument(tableName != null, "tableName is null");
+ checkArgument(authorizations != null, "authorizations is null");
+ ensureOpen();
+ return new TabletServerBatchDeleter(this, getTableId(tableName), authorizations,
+ numQueryThreads, config.merge(getBatchWriterConfig()));
+ }
+
+ @Override
+ public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
+ int numQueryThreads) throws TableNotFoundException {
+ ensureOpen();
+ return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig());
+ }
+
+ @Override
+ public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config)
+ throws TableNotFoundException {
+ checkArgument(tableName != null, "tableName is null");
+ ensureOpen();
+ // we used to allow null inputs for bw config
+ if (config == null) {
+ config = new BatchWriterConfig();
+ }
+ return new BatchWriterImpl(this, getTableId(tableName), config.merge(getBatchWriterConfig()));
+ }
+
+ @Override
+ public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException {
+ return createBatchWriter(tableName, new BatchWriterConfig());
+ }
+
+ @Override
+ public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
+ ensureOpen();
+ return new MultiTableBatchWriterImpl(this, config.merge(getBatchWriterConfig()));
+ }
+
+ @Override
+ public MultiTableBatchWriter createMultiTableBatchWriter() {
+ return createMultiTableBatchWriter(new BatchWriterConfig());
+ }
+
+ @Override
+ public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config)
+ throws TableNotFoundException {
+ ensureOpen();
+ return new ConditionalWriterImpl(this, getTableId(tableName), config);
+ }
+
+ @Override
+ public Scanner createScanner(String tableName, Authorizations authorizations)
+ throws TableNotFoundException {
+ checkArgument(tableName != null, "tableName is null");
+ checkArgument(authorizations != null, "authorizations is null");
+ ensureOpen();
+ Scanner scanner = new ScannerImpl(this, getTableId(tableName), authorizations);
+ Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties());
+ if (batchSize != null) {
+ scanner.setBatchSize(batchSize);
+ }
+ return scanner;
+ }
+
+ @Override
+ public Scanner createScanner(String tableName)
+ throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
+ Authorizations auths = securityOperations().getUserAuthorizations(getPrincipal());
+ return createScanner(tableName, auths);
+ }
+
+ @Override
+ public String whoami() {
+ ensureOpen();
+ return getCredentials().getPrincipal();
+ }
+
+ @Override
+ public synchronized TableOperations tableOperations() {
+ ensureOpen();
+ return tableops;
+ }
+
+ @Override
+ public synchronized NamespaceOperations namespaceOperations() {
+ ensureOpen();
+ return namespaceops;
+ }
+
+ @Override
+ public synchronized SecurityOperations securityOperations() {
+ ensureOpen();
+ if (secops == null)
+ secops = new SecurityOperationsImpl(this);
+
+ return secops;
+ }
+
+ @Override
+ public synchronized InstanceOperations instanceOperations() {
+ ensureOpen();
+ if (instanceops == null)
+ instanceops = new InstanceOperationsImpl(this);
+
+ return instanceops;
+ }
+
+ @Override
+ public synchronized ReplicationOperations replicationOperations() {
+ ensureOpen();
+ if (null == replicationops) {
+ replicationops = new ReplicationOperationsImpl(this);
+ }
+
+ return replicationops;
+ }
+
+ @Override
+ public Properties properties() {
+ ensureOpen();
+ Properties result = new Properties();
+ getProperties().forEach((key, value) -> {
+ if (!key.equals(ClientProperty.AUTH_TOKEN.getKey())) {
+ result.setProperty((String) key, (String) value);
+ }
+ });
+ return result;
+ }
+
+ public AuthenticationToken token() {
+ ensureOpen();
+ return getAuthenticationToken();
+ }
+
+ @Override
public void close() {
closed = true;
+ singletonReservation.close();
+ }
+
+ public static class ClientBuilderImpl<T>
+ implements InstanceArgs<T>, PropertyOptions<T>, AuthenticationArgs<T>, ConnectionOptions<T>,
+ SslOptions<T>, SaslOptions<T>, ClientFactory<T>, FromOptions<T> {
+
+ private Properties properties = new Properties();
+ private AuthenticationToken token = null;
+ private Function<ClientBuilderImpl<T>,T> builderFunction;
+
+ public ClientBuilderImpl(Function<ClientBuilderImpl<T>,T> builderFunction) {
+ this.builderFunction = builderFunction;
+ }
+
+ private ClientInfo getClientInfo() {
+ if (token != null) {
+ ClientProperty.validate(properties, false);
+ return new ClientInfoImpl(properties, token);
+ }
+ ClientProperty.validate(properties);
+ return new ClientInfoImpl(properties);
+ }
+
+ @Override
+ public T build() {
+ return builderFunction.apply(this);
+ }
+
+ public static AccumuloClient buildClient(ClientBuilderImpl<AccumuloClient> cbi) {
+ SingletonReservation reservation = SingletonManager.getClientReservation();
+ try {
+ // ClientContext closes reservation unless a RuntimeException is thrown
+ return new ClientContext(reservation, cbi.getClientInfo());
+ } catch (RuntimeException e) {
+ reservation.close();
+ throw e;
+ }
+ }
+
+ public static Properties buildProps(ClientBuilderImpl<Properties> cbi) {
+ ClientProperty.validate(cbi.properties);
+ return cbi.properties;
+ }
+
+ @Override
+ public AuthenticationArgs<T> to(CharSequence instanceName, CharSequence zookeepers) {
+ setProperty(ClientProperty.INSTANCE_NAME, instanceName);
+ setProperty(ClientProperty.INSTANCE_ZOOKEEPERS, zookeepers);
+ return this;
+ }
+
+ @Override
+ public SslOptions<T> truststore(CharSequence path) {
+ setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
+ return this;
+ }
+
+ @Override
+ public SslOptions<T> truststore(CharSequence path, CharSequence password, CharSequence type) {
+ setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
+ setProperty(ClientProperty.SSL_TRUSTSTORE_PASSWORD, password);
+ setProperty(ClientProperty.SSL_TRUSTSTORE_TYPE, type);
+ return this;
+ }
+
+ @Override
+ public SslOptions<T> keystore(CharSequence path) {
+ setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
+ return this;
+ }
+
+ @Override
+ public SslOptions<T> keystore(CharSequence path, CharSequence password, CharSequence type) {
+ setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
+ setProperty(ClientProperty.SSL_KEYSTORE_PASSWORD, password);
+ setProperty(ClientProperty.SSL_KEYSTORE_TYPE, type);
+ return this;
+ }
+
+ @Override
+ public SslOptions<T> useJsse() {
+ setProperty(ClientProperty.SSL_USE_JSSE, "true");
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions<T> zkTimeout(int timeout) {
+ ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.setTimeInMillis(properties, (long) timeout);
+ return this;
+ }
+
+ @Override
+ public SslOptions<T> useSsl() {
+ setProperty(ClientProperty.SSL_ENABLED, "true");
+ return this;
+ }
+
+ @Override
+ public SaslOptions<T> useSasl() {
+ setProperty(ClientProperty.SASL_ENABLED, "true");
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions<T> batchWriterConfig(BatchWriterConfig batchWriterConfig) {
+ ClientProperty.BATCH_WRITER_MEMORY_MAX.setBytes(properties, batchWriterConfig.getMaxMemory());
+ ClientProperty.BATCH_WRITER_LATENCY_MAX.setTimeInMillis(properties,
+ batchWriterConfig.getMaxLatency(TimeUnit.MILLISECONDS));
+ ClientProperty.BATCH_WRITER_TIMEOUT_MAX.setTimeInMillis(properties,
+ batchWriterConfig.getTimeout(TimeUnit.MILLISECONDS));
+ setProperty(ClientProperty.BATCH_WRITER_THREADS_MAX, batchWriterConfig.getMaxWriteThreads());
+ setProperty(ClientProperty.BATCH_WRITER_DURABILITY,
+ batchWriterConfig.getDurability().toString());
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions<T> batchScannerQueryThreads(int numQueryThreads) {
+ setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS, numQueryThreads);
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions<T> scannerBatchSize(int batchSize) {
+ setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
+ return this;
+ }
+
+ @Override
+ public SaslOptions<T> primary(CharSequence kerberosServerPrimary) {
+ setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
+ return this;
+ }
+
+ @Override
+ public SaslOptions<T> qop(CharSequence qualityOfProtection) {
+ setProperty(ClientProperty.SASL_QOP, qualityOfProtection);
+ return this;
+ }
+
+ @Override
+ public FromOptions<T> from(String propertiesFilePath) {
+ return from(ClientInfoImpl.toProperties(propertiesFilePath));
+ }
+
+ @Override
+ public FromOptions<T> from(Path propertiesFile) {
+ return from(ClientInfoImpl.toProperties(propertiesFile));
+ }
+
+ @Override
+ public FromOptions<T> from(Properties properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions<T> as(CharSequence username, CharSequence password) {
+ setProperty(ClientProperty.AUTH_PRINCIPAL, username);
+ ClientProperty.setPassword(properties, password);
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions<T> as(CharSequence principal, Path keyTabFile) {
+ setProperty(ClientProperty.AUTH_PRINCIPAL, principal);
+ ClientProperty.setKerberosKeytab(properties, keyTabFile.toString());
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions<T> as(CharSequence principal, AuthenticationToken token) {
+ if (token.isDestroyed()) {
+ throw new IllegalArgumentException("AuthenticationToken has been destroyed");
+ }
+ setProperty(ClientProperty.AUTH_PRINCIPAL, principal.toString());
+ ClientProperty.setAuthenticationToken(properties, token);
+ this.token = token;
+ return this;
+ }
+
+ public void setProperty(ClientProperty property, CharSequence value) {
+ properties.setProperty(property.getKey(), value.toString());
+ }
+
+ public void setProperty(ClientProperty property, Long value) {
+ setProperty(property, Long.toString(value));
+ }
+
+ public void setProperty(ClientProperty property, Integer value) {
+ setProperty(property, Integer.toString(value));
+ }
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java
index 3286c23..30816df 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConnectorImpl.java
@@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
@@ -43,49 +44,48 @@ import org.apache.accumulo.core.singletons.SingletonManager.Mode;
import org.apache.accumulo.core.trace.Tracer;
/**
- * This class now delegates to {@link AccumuloClientImpl}, except for the methods which were not
- * copied over to that.
+ * This class now delegates to {@link ClientContext}, except for the methods which were not copied
+ * over to that.
*/
@Deprecated
public class ConnectorImpl extends org.apache.accumulo.core.client.Connector {
private static final String SYSTEM_TOKEN_NAME = "org.apache.accumulo.server.security."
+ "SystemCredentials$SystemToken";
- private final AccumuloClientImpl impl;
+ private final ClientContext context;
- public ConnectorImpl(AccumuloClientImpl impl)
- throws AccumuloSecurityException, AccumuloException {
- this.impl = impl;
+ public ConnectorImpl(ClientContext context) throws AccumuloSecurityException, AccumuloException {
+ this.context = context;
SingletonManager.setMode(Mode.CONNECTOR);
- if (impl.context.getCredentials().getToken().isDestroyed())
- throw new AccumuloSecurityException(impl.context.getCredentials().getPrincipal(),
+ if (context.getCredentials().getToken().isDestroyed())
+ throw new AccumuloSecurityException(context.getCredentials().getPrincipal(),
SecurityErrorCode.TOKEN_EXPIRED);
// Skip fail fast for system services; string literal for class name, to avoid dependency on
// server jar
- final String tokenClassName = impl.context.getCredentials().getToken().getClass().getName();
+ final String tokenClassName = context.getCredentials().getToken().getClass().getName();
if (!SYSTEM_TOKEN_NAME.equals(tokenClassName)) {
- ServerClient.executeVoid(impl.context, iface -> {
- if (!iface.authenticate(Tracer.traceInfo(), impl.context.rpcCreds()))
+ ServerClient.executeVoid(context, iface -> {
+ if (!iface.authenticate(Tracer.traceInfo(), context.rpcCreds()))
throw new AccumuloSecurityException("Authentication failed, access denied",
SecurityErrorCode.BAD_CREDENTIALS);
});
}
}
- public AccumuloClientImpl getAccumuloClient() {
- return impl;
+ public AccumuloClient getAccumuloClient() {
+ return context;
}
@Override
@Deprecated
public org.apache.accumulo.core.client.Instance getInstance() {
- return impl.context.getDeprecatedInstance();
+ return context.getDeprecatedInstance();
}
@Override
public BatchScanner createBatchScanner(String tableName, Authorizations authorizations,
int numQueryThreads) throws TableNotFoundException {
- return impl.createBatchScanner(tableName, authorizations, numQueryThreads);
+ return context.createBatchScanner(tableName, authorizations, numQueryThreads);
}
@Override
@@ -94,7 +94,7 @@ public class ConnectorImpl extends org.apache.accumulo.core.client.Connector {
throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
- return new TabletServerBatchDeleter(impl.context, impl.getTableId(tableName), authorizations,
+ return new TabletServerBatchDeleter(context, context.getTableId(tableName), authorizations,
numQueryThreads, new BatchWriterConfig().setMaxMemory(maxMemory)
.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
}
@@ -102,14 +102,14 @@ public class ConnectorImpl extends org.apache.accumulo.core.client.Connector {
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException {
- return impl.createBatchDeleter(tableName, authorizations, numQueryThreads, config);
+ return context.createBatchDeleter(tableName, authorizations, numQueryThreads, config);
}
@Override
public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency,
int maxWriteThreads) throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
- return new BatchWriterImpl(impl.context, impl.getTableId(tableName),
+ return new BatchWriterImpl(context, context.getTableId(tableName),
new BatchWriterConfig().setMaxMemory(maxMemory)
.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
}
@@ -117,62 +117,61 @@ public class ConnectorImpl extends org.apache.accumulo.core.client.Connector {
@Override
public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config)
throws TableNotFoundException {
- return impl.createBatchWriter(tableName, config);
+ return context.createBatchWriter(tableName, config);
}
@Override
public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency,
int maxWriteThreads) {
- return new MultiTableBatchWriterImpl(impl.context,
- new BatchWriterConfig().setMaxMemory(maxMemory)
- .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
+ return new MultiTableBatchWriterImpl(context, new BatchWriterConfig().setMaxMemory(maxMemory)
+ .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
}
@Override
public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
- return impl.createMultiTableBatchWriter(config);
+ return context.createMultiTableBatchWriter(config);
}
@Override
public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config)
throws TableNotFoundException {
- return impl.createConditionalWriter(tableName, config);
+ return context.createConditionalWriter(tableName, config);
}
@Override
public Scanner createScanner(String tableName, Authorizations authorizations)
throws TableNotFoundException {
- return impl.createScanner(tableName, authorizations);
+ return context.createScanner(tableName, authorizations);
}
@Override
public String whoami() {
- return impl.whoami();
+ return context.whoami();
}
@Override
public TableOperations tableOperations() {
- return impl.tableOperations();
+ return context.tableOperations();
}
@Override
public NamespaceOperations namespaceOperations() {
- return impl.namespaceOperations();
+ return context.namespaceOperations();
}
@Override
public SecurityOperations securityOperations() {
- return impl.securityOperations();
+ return context.securityOperations();
}
@Override
public InstanceOperations instanceOperations() {
- return impl.instanceOperations();
+ return context.instanceOperations();
}
@Override
public ReplicationOperations replicationOperations() {
- return impl.replicationOperations();
+ return context.replicationOperations();
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
index 6f09508..10bb2ea 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
@@ -28,7 +28,6 @@ import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.SampleNotPresentException;
@@ -150,7 +149,6 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
private SortedKeyValueIterator<Key,Value> iter;
private Range range;
private KeyExtent currentExtent;
- private AccumuloClient client;
private Table.ID tableId;
private Authorizations authorizations;
private ClientContext context;
@@ -174,8 +172,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
this.readers = new ArrayList<>();
try {
- client = context.getClient();
- config = new ConfigurationCopy(client.instanceOperations().getSiteConfiguration());
+ config = new ConfigurationCopy(context.instanceOperations().getSiteConfiguration());
nextTablet();
while (iter != null && !iter.hasTop())
@@ -292,11 +289,9 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
}
- private TabletMetadata getTabletFiles(Range nextRange)
- throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-
+ private TabletMetadata getTabletFiles(Range nextRange) {
try (TabletsMetadata tablets = TabletsMetadata.builder().scanMetadataTable()
- .overRange(nextRange).fetchFiles().fetchLocation().fetchPrev().build(client)) {
+ .overRange(nextRange).fetchFiles().fetchLocation().fetchPrev().build(context)) {
return tablets.iterator().next();
}
}
@@ -309,7 +304,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
// possible race condition here, if table is renamed
String tableName = Tables.getTableName(context, tableId);
AccumuloConfiguration acuTableConf = new ConfigurationCopy(
- client.tableOperations().getProperties(tableName));
+ context.tableOperations().getProperties(tableName));
Configuration conf = CachedConfiguration.getInstance();
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationOperationsImpl.java
index 8210d06..308ad8b 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ReplicationOperationsImpl.java
@@ -66,7 +66,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
throws AccumuloException, AccumuloSecurityException, PeerExistsException {
requireNonNull(name);
requireNonNull(replicaType);
- context.getClient().instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + name,
+ context.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + name,
replicaType);
}
@@ -74,8 +74,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
public void removePeer(final String name)
throws AccumuloException, AccumuloSecurityException, PeerNotFoundException {
requireNonNull(name);
- context.getClient().instanceOperations()
- .removeProperty(Property.REPLICATION_PEERS.getKey() + name);
+ context.instanceOperations().removeProperty(Property.REPLICATION_PEERS.getKey() + name);
}
@Override
@@ -146,13 +145,12 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
log.debug("Collecting referenced files for replication of table {}", tableName);
- AccumuloClient client = context.getClient();
- Table.ID tableId = getTableId(client, tableName);
+ Table.ID tableId = getTableId(context, tableName);
log.debug("Found id of {} for name {}", tableId, tableName);
// Get the WALs currently referenced by the table
- BatchScanner metaBs = client.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
+ BatchScanner metaBs = context.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(tableId)));
metaBs.fetchColumnFamily(LogColumnFamily.NAME);
Set<String> wals = new HashSet<>();
@@ -166,7 +164,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
}
// And the WALs that need to be replicated for this table
- metaBs = client.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
+ metaBs = context.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
metaBs.setRanges(Collections.singleton(ReplicationSection.getRange()));
metaBs.fetchColumnFamily(ReplicationSection.COLF);
try {
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index c4dc693..a435c68 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -1102,7 +1102,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
private Path checkPath(String dir, String kind, String type)
throws IOException, AccumuloException, AccumuloSecurityException {
Path ret;
- Map<String,String> props = context.getClient().instanceOperations().getSystemConfiguration();
+ Map<String,String> props = context.instanceOperations().getSystemConfiguration();
AccumuloConfiguration conf = new ConfigurationCopy(props);
FileSystem fs = VolumeConfiguration.getVolume(dir, CachedConfiguration.getInstance(), conf)
@@ -1347,7 +1347,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
checkArgument(tableName != null, "tableName is null");
checkArgument(auths != null, "auths is null");
- Scanner scanner = context.getClient().createScanner(tableName, auths);
+ Scanner scanner = context.createScanner(tableName, auths);
return FindMax.findMax(scanner, startRow, startInclusive, endRow, endInclusive);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index 2139120..56fc3af 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -568,8 +568,8 @@ public class TabletServerBatchWriter {
af.put(new TabletIdImpl(entry.getKey()), codes);
}
- throw new MutationsRejectedException(context.getProperties(), cvsList, af, serverSideErrors,
- unknownErrors, lastUnknownError);
+ throw new MutationsRejectedException(context, cvsList, af, serverSideErrors, unknownErrors,
+ lastUnknownError);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
index a1b7dd6..c1b954a 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
@@ -121,8 +121,7 @@ public class ConfiguratorBase {
AuthenticationToken token = ClientProperty.getAuthenticationToken(result);
if (token instanceof KerberosToken) {
log.info("Received KerberosToken, attempting to fetch DelegationToken");
- try {
- AccumuloClient client = Accumulo.newClient().from(props).build();
+ try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
AuthenticationToken delegationToken = client.securityOperations()
.getDelegationToken(new DelegationTokenConfig());
ClientProperty.setAuthenticationToken(result, delegationToken);
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
index 142a176..9897000 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
@@ -842,7 +842,7 @@ public class InputConfigurator extends ConfiguratorBase {
Range metadataRange = new Range(new KeyExtent(tableId, startRow, null).getMetadataEntry(),
true, null, false);
- Scanner scanner = context.getClient().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/MapReduceClientOpts.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/MapReduceClientOpts.java
index 1a4964d..ecf3e3c 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/MapReduceClientOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/MapReduceClientOpts.java
@@ -61,24 +61,24 @@ public class MapReduceClientOpts extends ClientOpts {
log.info("Obtaining delegation token for {}", newPrincipal);
setPrincipal(newPrincipal);
- AccumuloClient client = Accumulo.newClient().from(getClientProperties())
- .as(newPrincipal, krbToken).build();
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProperties())
+ .as(newPrincipal, krbToken).build()) {
- // Do the explicit check to see if the user has the permission to get a delegation token
- if (!client.securityOperations().hasSystemPermission(client.whoami(),
- SystemPermission.OBTAIN_DELEGATION_TOKEN)) {
- log.error(
- "{} doesn't have the {} SystemPermission neccesary to obtain a delegation"
- + " token. MapReduce tasks cannot automatically use the client's"
- + " credentials on remote servers. Delegation tokens provide a means to run"
- + " MapReduce without distributing the user's credentials.",
- user.getUserName(), SystemPermission.OBTAIN_DELEGATION_TOKEN.name());
- throw new IllegalStateException(
- client.whoami() + " does not have permission to obtain a delegation token");
+ // Do the explicit check to see if the user has the permission to get a delegation token
+ if (!client.securityOperations().hasSystemPermission(client.whoami(),
+ SystemPermission.OBTAIN_DELEGATION_TOKEN)) {
+ log.error(
+ "{} doesn't have the {} SystemPermission neccesary to obtain a delegation"
+ + " token. MapReduce tasks cannot automatically use the client's"
+ + " credentials on remote servers. Delegation tokens provide a means to run"
+ + " MapReduce without distributing the user's credentials.",
+ user.getUserName(), SystemPermission.OBTAIN_DELEGATION_TOKEN.name());
+ throw new IllegalStateException(
+ client.whoami() + " does not have permission to obtain a delegation token");
+ }
+ // Get the delegation token from Accumulo
+ return client.securityOperations().getDelegationToken(new DelegationTokenConfig());
}
-
- // Get the delegation token from Accumulo
- return client.securityOperations().getDelegationToken(new DelegationTokenConfig());
} catch (Exception e) {
final String msg = "Failed to acquire DelegationToken for use with MapReduce";
log.error(msg, e);
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataServicer.java b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataServicer.java
index f65ada9..3f857e3 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataServicer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataServicer.java
@@ -35,8 +35,7 @@ public abstract class MetadataServicer {
public static MetadataServicer forTableName(ClientContext context, String tableName)
throws AccumuloException, AccumuloSecurityException {
checkArgument(tableName != null, "tableName is null");
- return forTableId(context,
- Table.ID.of(context.getClient().tableOperations().tableIdMap().get(tableName)));
+ return forTableId(context, Table.ID.of(context.tableOperations().tableIdMap().get(tableName)));
}
public static MetadataServicer forTableId(ClientContext context, Table.ID tableId) {
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/TableMetadataServicer.java b/core/src/main/java/org/apache/accumulo/core/metadata/TableMetadataServicer.java
index be2cee9..f2c7891 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/TableMetadataServicer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/TableMetadataServicer.java
@@ -63,8 +63,7 @@ abstract class TableMetadataServicer extends MetadataServicer {
public void getTabletLocations(SortedMap<KeyExtent,String> tablets)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- Scanner scanner = context.getClient().createScanner(getServicingTableName(),
- Authorizations.EMPTY);
+ Scanner scanner = context.createScanner(getServicingTableName(), Authorizations.EMPTY);
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 1ae57a7..f5c7bf1 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -35,7 +35,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Table;
import org.apache.accumulo.core.clientImpl.Table.ID;
import org.apache.accumulo.core.data.Range;
@@ -77,11 +76,6 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
private ID tableId;
@Override
- public TabletsMetadata build(ClientContext ctx) {
- return build(ctx.getClient());
- }
-
- @Override
public TabletsMetadata build(AccumuloClient client) {
try {
Scanner scanner = new IsolatedScanner(client.createScanner(table, Authorizations.EMPTY));
@@ -262,8 +256,6 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
public interface Options {
TabletsMetadata build(AccumuloClient client);
- TabletsMetadata build(ClientContext ctx);
-
/**
* Checks that the metadata table forms a linked list and automatically backs up until it does.
*/
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
index 9d69f24..905ce3d 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
@@ -91,7 +91,7 @@ public class ReplicationTable {
}
public static boolean isOnline(AccumuloClient client) {
- return TableState.ONLINE == Tables.getTableState(new ClientContext(client), ID);
+ return TableState.ONLINE == Tables.getTableState((ClientContext) client, ID);
}
public static void setOnline(AccumuloClient client)
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index b6e6c45..dcce4e6 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -204,7 +204,7 @@ public class Gatherer {
if (location == null) {
if (tservers == null) {
- tservers = ctx.getClient().instanceOperations().getTabletServers();
+ tservers = ctx.instanceOperations().getTabletServers();
Collections.sort(tservers);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Merge.java b/core/src/main/java/org/apache/accumulo/core/util/Merge.java
index 24360dc..9786ccb 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Merge.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Merge.java
@@ -215,7 +215,7 @@ public class Merge {
Table.ID tableId;
TabletsMetadata tablets;
try {
- ClientContext context = new ClientContext(client);
+ ClientContext context = (ClientContext) client;
tableId = Tables.getTableId(context, tablename);
tablets = TabletsMetadata.builder().scanMetadataTable()
.overRange(new KeyExtent(tableId, end, start).toMetadataRange()).fetchFiles().fetchPrev()
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataServicerTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataServicerTest.java
index 67c9e75..34d608d 100644
--- a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataServicerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataServicerTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import java.util.HashMap;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableExistsException;
@@ -51,12 +50,10 @@ public class MetadataServicerTest {
tableNameToIdMap.put(userTableName, userTableId.canonicalID());
context = EasyMock.createMock(ClientContext.class);
- AccumuloClient client = EasyMock.createMock(AccumuloClient.class);
TableOperations tableOps = EasyMock.createMock(TableOperations.class);
EasyMock.expect(tableOps.tableIdMap()).andReturn(tableNameToIdMap).anyTimes();
- EasyMock.expect(client.tableOperations()).andReturn(tableOps).anyTimes();
- EasyMock.expect(context.getClient()).andReturn(client).anyTimes();
- EasyMock.replay(context, client, tableOps);
+ EasyMock.expect(context.tableOperations()).andReturn(tableOps).anyTimes();
+ EasyMock.replay(context, tableOps);
}
@Test
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
index 8548cfd..cf2bc04 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java
@@ -298,7 +298,7 @@ public abstract class AbstractInputFormat {
log.debug("Initializing input split: " + baseSplit);
client = createClient(job);
- ClientContext context = new ClientContext(client);
+ ClientContext context = (ClientContext) client;
Authorizations authorizations = getScanAuthorizations(job);
String classLoaderContext = getClassLoaderContext(job);
String table = baseSplit.getTableName();
@@ -307,7 +307,7 @@ public abstract class AbstractInputFormat {
// configuration, but the scanner will use the table id resolved at job setup time
InputTableConfig tableConfig = getInputTableConfig(job, baseSplit.getTableName());
- log.debug("Created client with user: " + client.whoami());
+ log.debug("Created client with user: " + context.whoami());
log.debug("Creating scanner for table: " + table);
log.debug("Authorizations are: " + authorizations);
@@ -319,7 +319,7 @@ public abstract class AbstractInputFormat {
// Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit
// will not span tablets
int scanThreads = 1;
- scanner = client.createBatchScanner(baseSplit.getTableName(), authorizations,
+ scanner = context.createBatchScanner(baseSplit.getTableName(), authorizations,
scanThreads);
setupIterators(job, scanner, baseSplit.getTableName(), baseSplit);
if (null != classLoaderContext) {
@@ -445,8 +445,7 @@ public abstract class AbstractInputFormat {
Table.ID tableId, List<Range> ranges)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
try (AccumuloClient client = createClient(job)) {
- ClientContext context = new ClientContext(client);
- return InputConfigurator.binOffline(tableId, ranges, context);
+ return InputConfigurator.binOffline(tableId, ranges, (ClientContext) client);
}
}
@@ -470,7 +469,7 @@ public abstract class AbstractInputFormat {
String tableName = tableConfigEntry.getKey();
InputTableConfig tableConfig = tableConfigEntry.getValue();
- ClientContext context = new ClientContext(client);
+ ClientContext context = (ClientContext) client;
Table.ID tableId;
// resolve table name to id once, and use id from this point forward
try {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
index e74889c..0529c6d 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java
@@ -302,7 +302,7 @@ public abstract class AbstractInputFormat {
log.debug("Initializing input split: " + split);
client = createClient(attempt);
- ClientContext context = new ClientContext(client);
+ ClientContext context = (ClientContext) client;
Authorizations authorizations = getScanAuthorizations(attempt);
String classLoaderContext = getClassLoaderContext(attempt);
String table = split.getTableName();
@@ -324,7 +324,7 @@ public abstract class AbstractInputFormat {
// Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit
// will not span tablets
int scanThreads = 1;
- scanner = client.createBatchScanner(split.getTableName(), authorizations, scanThreads);
+ scanner = context.createBatchScanner(split.getTableName(), authorizations, scanThreads);
setupIterators(attempt, scanner, split.getTableName(), split);
if (null != classLoaderContext) {
scanner.setClassLoaderContext(classLoaderContext);
@@ -467,8 +467,7 @@ public abstract class AbstractInputFormat {
Table.ID tableId, List<Range> ranges)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
try (AccumuloClient client = createClient(context)) {
- ClientContext clientContext = new ClientContext(client);
- return InputConfigurator.binOffline(tableId, ranges, clientContext);
+ return InputConfigurator.binOffline(tableId, ranges, (ClientContext) client);
}
}
@@ -483,7 +482,7 @@ public abstract class AbstractInputFormat {
String tableName = tableConfigEntry.getKey();
InputTableConfig tableConfig = tableConfigEntry.getValue();
- ClientContext clientContext = new ClientContext(client);
+ ClientContext clientContext = (ClientContext) client;
Table.ID tableId;
// resolve table name to id once, and use id from this point forward
try {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
index ecae8a3..3fb28f4 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java
@@ -711,8 +711,7 @@ public class InputConfigurator extends ConfiguratorBase {
public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf,
Table.ID tableId) {
try (AccumuloClient client = createClient(implementingClass, conf)) {
- ClientContext context = new ClientContext(client);
- return TabletLocator.getLocator(context, tableId);
+ return TabletLocator.getLocator((ClientContext) client, tableId);
}
}
@@ -831,7 +830,7 @@ public class InputConfigurator extends ConfiguratorBase {
Range metadataRange = new Range(new KeyExtent(tableId, startRow, null).getMetadataEntry(),
true, null, false);
- Scanner scanner = context.getClient().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 590d4f9..34e4c8c 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -842,10 +842,9 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
throws AccumuloException, AccumuloSecurityException {
MasterClientService.Iface client = null;
while (true) {
- try {
- ClientContext context = new ClientContext(getClientProperties());
- client = MasterClient.getConnectionWithRetry(context);
- return client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
+ client = MasterClient.getConnectionWithRetry((ClientContext) c);
+ return client.getMasterStats(Tracer.traceInfo(), ((ClientContext)c).rpcCreds());
} catch (ThriftSecurityException exception) {
throw new AccumuloSecurityException(exception);
} catch (ThriftNotActiveServiceException e) {
diff --git a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterExistingZooKeepersTest.java b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterExistingZooKeepersTest.java
index 605ac9a..9bcc63a 100644
--- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterExistingZooKeepersTest.java
+++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterExistingZooKeepersTest.java
@@ -24,7 +24,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -93,16 +92,15 @@ public class MiniAccumuloClusterExistingZooKeepersTest {
@Test
public void canConnectViaExistingZooKeeper() throws Exception {
org.apache.accumulo.core.client.Connector conn = accumulo.getConnector("root", SECRET);
- ClientContext context = new ClientContext(accumulo.getClientProperties());
- assertEquals(zooKeeper.getConnectString(), context.getZooKeepers());
+ assertEquals(zooKeeper.getConnectString(), conn.getInstance().getZooKeepers());
String tableName = "foo";
conn.tableOperations().create(tableName);
Map<String,String> tableIds = conn.tableOperations().tableIdMap();
assertTrue(tableIds.containsKey(tableName));
- String zkTablePath = String.format("/accumulo/%s/tables/%s/name", context.getInstanceID(),
- tableIds.get(tableName));
+ String zkTablePath = String.format("/accumulo/%s/tables/%s/name",
+ conn.getInstance().getInstanceID(), tableIds.get(tableName));
try (CuratorFramework client = CuratorFrameworkFactory.newClient(zooKeeper.getConnectString(),
new RetryOneTime(1))) {
client.start();
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index f2ea066..4dffaa3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -23,10 +23,6 @@ import java.util.Objects;
import java.util.Properties;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.clientImpl.AccumuloClientImpl;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -35,7 +31,6 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.crypto.CryptoServiceFactory.ClassloaderType;
import org.apache.accumulo.core.rpc.SslConnectionParams;
-import org.apache.accumulo.core.singletons.SingletonReservation;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
@@ -239,18 +234,6 @@ public class ServerContext extends ClientContext {
return secretManager;
}
- @Override
- public synchronized AccumuloClient getClient() {
- if (client == null) {
- client = new AccumuloClientImpl(SingletonReservation.noop(), this);
- }
- return client;
- }
-
- public AccumuloClient getClient(String principal, AuthenticationToken token) {
- return Accumulo.newClient().from(info.getProperties()).as(principal, token).build();
- }
-
public synchronized TableManager getTableManager() {
if (tableManager == null) {
tableManager = new TableManager(this);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java
index 598f236..1113505 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java
@@ -17,7 +17,6 @@
package org.apache.accumulo.server.cli;
import org.apache.accumulo.core.cli.ClientOpts;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.server.ServerContext;
@@ -26,10 +25,6 @@ public class ServerUtilOpts extends ClientOpts {
setPrincipal("root");
}
- public ClientContext getClientContext() {
- return new ClientContext(getClientProperties());
- }
-
private ServerContext context;
public synchronized ServerContext getServerContext() {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index 2aa7cbe..33448e6 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -467,8 +467,7 @@ public class ClientServiceHandler implements ClientService.Iface {
}
// use the same set of tableIds that were validated above to avoid race conditions
- Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(tableIds, fs,
- context.getClient());
+ Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(tableIds, fs, context);
List<TDiskUsage> retUsages = new ArrayList<>();
for (Map.Entry<TreeSet<String>,Long> usageItem : diskUsage.entrySet()) {
retUsages.add(new TDiskUsage(new ArrayList<>(usageItem.getKey()), usageItem.getValue()));
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
index 8fadccd..ec220d6 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
@@ -136,7 +136,7 @@ public class TableLoadBalancer extends TabletBalancer {
protected TableOperations getTableOperations() {
if (tops == null)
- tops = this.context.getClient().tableOperations();
+ tops = this.context.tableOperations();
return tops;
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 09fc9f4..79f39e1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -90,7 +90,7 @@ public class MetaDataStateStore extends TabletStateStore {
BatchWriter createBatchWriter() {
try {
- return context.getClient().createBatchWriter(targetTableName,
+ return context.createBatchWriter(targetTableName,
new BatchWriterConfig().setMaxMemory(MAX_MEMORY)
.setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS));
} catch (Exception e) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index b67d54d..5632503 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.SortedMap;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase;
@@ -60,8 +59,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
// scan over metadata table, looking for tablets in the wrong state based on the live servers
// and online tables
try {
- AccumuloClient accumuloClient = context.getClient();
- mdScanner = accumuloClient.createBatchScanner(tableName, Authorizations.EMPTY, 8);
+ mdScanner = context.createBatchScanner(tableName, Authorizations.EMPTY, 8);
configureScanner(mdScanner, state);
mdScanner.setRanges(Collections.singletonList(range));
iter = mdScanner.iterator();
diff --git a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
index 98615eb..7893cce 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.Table;
@@ -168,8 +167,7 @@ public class ProblemReports implements Iterable<ProblemReport> {
return;
}
- AccumuloClient accumuloClient = context.getClient();
- Scanner scanner = accumuloClient.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
scanner.addScanIterator(new IteratorSetting(1, "keys-only", SortedKeyIterator.class));
scanner.setRange(new Range(new Text("~err_" + table)));
@@ -223,10 +221,7 @@ public class ProblemReports implements Iterable<ProblemReport> {
if (iter2 == null) {
try {
if ((table == null || !isMeta(table)) && iter1Count == 0) {
- AccumuloClient accumuloClient = context.getClient();
- Scanner scanner = accumuloClient.createScanner(MetadataTable.NAME,
- Authorizations.EMPTY);
-
+ Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
scanner.setTimeout(3, TimeUnit.SECONDS);
if (table == null) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
index 0ca268f..4f60224 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
@@ -56,7 +56,7 @@ public class ReplicaSystemHelper {
*/
public void recordNewStatus(Path filePath, Status status, ReplicationTarget target)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- try (BatchWriter bw = context.getClient().createBatchWriter(ReplicationTable.NAME,
+ try (BatchWriter bw = context.createBatchWriter(ReplicationTable.NAME,
new BatchWriterConfig())) {
log.debug("Recording new status for {}, {}", filePath, ProtobufUtil.toString(status));
Mutation m = new Mutation(filePath.toString());
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index a58a4f1..5caf15d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -161,7 +161,7 @@ public class ReplicationUtil {
// Read over the queued work
BatchScanner bs;
try {
- bs = context.getClient().createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
+ bs = context.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
} catch (TableNotFoundException e) {
log.debug("No replication table exists", e);
return counts;
@@ -198,7 +198,7 @@ public class ReplicationUtil {
// Read over the queued work
BatchScanner bs;
try {
- bs = context.getClient().createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
+ bs = context.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
} catch (TableNotFoundException e) {
log.debug("No replication table exists", e);
return paths;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index a59a40e..962324f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -271,10 +271,9 @@ public class Admin implements KeywordExecutable {
}
}
- private static int ping(ClientContext context, List<String> args)
- throws AccumuloException, AccumuloSecurityException {
+ private static int ping(ClientContext context, List<String> args) {
- InstanceOperations io = context.getClient().instanceOperations();
+ InstanceOperations io = context.instanceOperations();
if (args.size() == 0) {
args = io.getTabletServers();
@@ -301,31 +300,25 @@ public class Admin implements KeywordExecutable {
* an attempt to initiate flushes of all tables and give up if it takes too long.
*
*/
- private static void flushAll(final ClientContext context)
- throws AccumuloException, AccumuloSecurityException {
+ private static void flushAll(final ClientContext context) {
final AtomicInteger flushesStarted = new AtomicInteger(0);
- Runnable flushTask = new Runnable() {
-
- @Override
- public void run() {
- try {
- AccumuloClient client = context.getClient();
- Set<String> tables = client.tableOperations().tableIdMap().keySet();
- for (String table : tables) {
- if (table.equals(MetadataTable.NAME))
- continue;
- try {
- client.tableOperations().flush(table, null, null, false);
- flushesStarted.incrementAndGet();
- } catch (TableNotFoundException e) {
- // ignore
- }
+ Runnable flushTask = () -> {
+ try {
+ Set<String> tables = context.tableOperations().tableIdMap().keySet();
+ for (String table : tables) {
+ if (table.equals(MetadataTable.NAME))
+ continue;
+ try {
+ context.tableOperations().flush(table, null, null, false);
+ flushesStarted.incrementAndGet();
+ } catch (TableNotFoundException e) {
+ // ignore
}
- } catch (Exception e) {
- log.warn("Failed to intiate flush {}", e.getMessage());
}
+ } catch (Exception e) {
+ log.warn("Failed to intiate flush {}", e.getMessage());
}
};
@@ -451,12 +444,11 @@ public class Admin implements KeywordExecutable {
throw new IllegalArgumentException(opts.directory + " is not writable");
}
}
- AccumuloClient accumuloClient = context.getClient();
defaultConfig = DefaultConfiguration.getInstance();
- siteConfig = accumuloClient.instanceOperations().getSiteConfiguration();
- systemConfig = accumuloClient.instanceOperations().getSystemConfiguration();
+ siteConfig = context.instanceOperations().getSiteConfiguration();
+ systemConfig = context.instanceOperations().getSystemConfiguration();
if (opts.allConfiguration || opts.users) {
- localUsers = Lists.newArrayList(accumuloClient.securityOperations().listLocalUsers());
+ localUsers = Lists.newArrayList(context.securityOperations().listLocalUsers());
Collections.sort(localUsers);
}
@@ -464,35 +456,35 @@ public class Admin implements KeywordExecutable {
// print accumulo site
printSystemConfiguration(outputDirectory);
// print namespaces
- for (String namespace : accumuloClient.namespaceOperations().list()) {
- printNameSpaceConfiguration(accumuloClient, namespace, outputDirectory);
+ for (String namespace : context.namespaceOperations().list()) {
+ printNameSpaceConfiguration(context, namespace, outputDirectory);
}
// print tables
- SortedSet<String> tableNames = accumuloClient.tableOperations().list();
+ SortedSet<String> tableNames = context.tableOperations().list();
for (String tableName : tableNames) {
- printTableConfiguration(accumuloClient, tableName, outputDirectory);
+ printTableConfiguration(context, tableName, outputDirectory);
}
// print users
for (String user : localUsers) {
- printUserConfiguration(accumuloClient, user, outputDirectory);
+ printUserConfiguration(context, user, outputDirectory);
}
} else {
if (opts.systemConfiguration) {
printSystemConfiguration(outputDirectory);
}
if (opts.namespaceConfiguration) {
- for (String namespace : accumuloClient.namespaceOperations().list()) {
- printNameSpaceConfiguration(accumuloClient, namespace, outputDirectory);
+ for (String namespace : context.namespaceOperations().list()) {
+ printNameSpaceConfiguration(context, namespace, outputDirectory);
}
}
if (opts.tables.size() > 0) {
for (String tableName : opts.tables) {
- printTableConfiguration(accumuloClient, tableName, outputDirectory);
+ printTableConfiguration(context, tableName, outputDirectory);
}
}
if (opts.users) {
for (String user : localUsers) {
- printUserConfiguration(accumuloClient, user, outputDirectory);
+ printUserConfiguration(context, user, outputDirectory);
}
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 5df7b2d..b8dec24 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -78,7 +78,7 @@ public class ListVolumesUsed {
System.out.println("Listing volumes referenced in " + name + " tablets section");
- Scanner scanner = context.getClient().createScanner(name, Authorizations.EMPTY);
+ Scanner scanner = context.createScanner(name, Authorizations.EMPTY);
scanner.setRange(MetadataSchema.TabletsSection.getRange());
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index aa06ff7..a559421 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -643,8 +643,7 @@ public class MetadataTableUtil {
rootTableEntries = getLogEntries(context, new KeyExtent(MetadataTable.ID, null, null))
.iterator();
try {
- Scanner scanner = context.getClient().createScanner(MetadataTable.NAME,
- Authorizations.EMPTY);
+ Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
log.info("Setting range to {}", MetadataSchema.TabletsSection.getRange());
scanner.setRange(MetadataSchema.TabletsSection.getRange());
scanner.fetchColumnFamily(LogColumnFamily.NAME);
@@ -872,19 +871,18 @@ public class MetadataTableUtil {
public static void cloneTable(ServerContext context, Table.ID srcTableId, Table.ID tableId,
VolumeManager volumeManager) throws Exception {
- AccumuloClient client = context.getClient();
- try (BatchWriter bw = client.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) {
+ try (BatchWriter bw = context.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) {
while (true) {
try {
- initializeClone(null, srcTableId, tableId, client, bw);
+ initializeClone(null, srcTableId, tableId, context, bw);
// the following loop looks changes in the file that occurred during the copy.. if files
// were dereferenced then they could have been GCed
while (true) {
- int rewrites = checkClone(null, srcTableId, tableId, client, bw);
+ int rewrites = checkClone(null, srcTableId, tableId, context, bw);
if (rewrites == 0)
break;
@@ -908,7 +906,7 @@ public class MetadataTableUtil {
}
// delete the clone markers and create directory entries
- Scanner mscanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Scanner mscanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java b/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java
index 8628386..8be3815 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java
@@ -69,7 +69,7 @@ public class RandomizeVolumes {
log.error("There are not enough volumes configured");
return 1;
}
- String tblStr = context.getClient().tableOperations().tableIdMap().get(tableName);
+ String tblStr = context.tableOperations().tableIdMap().get(tableName);
if (null == tblStr) {
log.error("Could not determine the table ID for table {}", tableName);
return 2;
@@ -78,15 +78,15 @@ public class RandomizeVolumes {
TableState tableState = context.getTableManager().getTableState(tableId);
if (TableState.OFFLINE != tableState) {
log.info("Taking {} offline", tableName);
- context.getClient().tableOperations().offline(tableName, true);
+ context.tableOperations().offline(tableName, true);
log.info("{} offline", tableName);
}
SimpleThreadPool pool = new SimpleThreadPool(50, "directory maker");
log.info("Rewriting entries for {}", tableName);
- Scanner scanner = context.getClient().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
DIRECTORY_COLUMN.fetch(scanner);
scanner.setRange(TabletsSection.getRange(tableId));
- BatchWriter writer = context.getClient().createBatchWriter(MetadataTable.NAME, null);
+ BatchWriter writer = context.createBatchWriter(MetadataTable.NAME, null);
int count = 0;
for (Entry<Key,Value> entry : scanner) {
String oldLocation = entry.getValue().toString();
@@ -137,7 +137,7 @@ public class RandomizeVolumes {
}
log.info("Updated {} entries for table {}", count, tableName);
if (TableState.OFFLINE != tableState) {
- context.getClient().tableOperations().online(tableName, true);
+ context.tableOperations().online(tableName, true);
log.info("table {} back online", tableName);
}
return 0;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
index 47e5772..5369600 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.cli.ScannerOpts;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -128,8 +127,7 @@ public class RemoveEntriesForMissingFiles {
System.out.printf("Scanning : %s %s\n", tableName, range);
VolumeManager fs = context.getVolumeManager();
- AccumuloClient accumuloClient = context.getClient();
- Scanner metadata = accumuloClient.createScanner(tableName, Authorizations.EMPTY);
+ Scanner metadata = context.createScanner(tableName, Authorizations.EMPTY);
metadata.setRange(range);
metadata.fetchColumnFamily(DataFileColumnFamily.NAME);
int count = 0;
@@ -138,7 +136,7 @@ public class RemoveEntriesForMissingFiles {
BatchWriter writer = null;
if (fix)
- writer = accumuloClient.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ writer = context.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
for (Entry<Key,Value> entry : metadata) {
if (exceptionRef.get() != null)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 5ec0000..b6cf121 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -84,10 +84,7 @@ public class ReplicationTableUtil {
static synchronized Writer getWriter(ClientContext context) {
Writer replicationTable = writers.get(context.getCredentials());
if (replicationTable == null) {
- AccumuloClient client = context.getClient();
-
- configureMetadataTable(client, MetadataTable.NAME);
-
+ configureMetadataTable(context, MetadataTable.NAME);
replicationTable = new Writer(context, MetadataTable.ID);
writers.put(context.getCredentials(), replicationTable);
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java b/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
index e7c310e..a405975 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
@@ -216,8 +216,7 @@ public class TableDiskUsage {
}
}
- ClientContext context = new ClientContext(client);
- Map<Table.ID,String> reverseTableIdMap = Tables.getIdToNameMap(context);
+ Map<Table.ID,String> reverseTableIdMap = Tables.getIdToNameMap((ClientContext) client);
TreeMap<TreeSet<String>,Long> usage = new TreeMap<>((o1, o2) -> {
int len1 = o1.size();
@@ -271,11 +270,10 @@ public class TableDiskUsage {
throws TableNotFoundException, IOException {
HashSet<Table.ID> tableIds = new HashSet<>();
- ClientContext context = new ClientContext(client);
// Get table IDs for all tables requested to be 'du'
for (String tableName : tableNames) {
- Table.ID tableId = Tables.getTableId(context, tableName);
+ Table.ID tableId = Tables.getTableId((ClientContext) client, tableName);
if (tableId == null)
throw new TableNotFoundException(null, tableName, "Table " + tableName + " not found");
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
index f956cd0..24e731c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -74,9 +74,8 @@ public class VerifyTabletAssignments {
opts.parseArgs(VerifyTabletAssignments.class.getName(), args);
try (AccumuloClient client = opts.createClient()) {
- ClientContext context = new ClientContext(client);
for (String table : client.tableOperations().list())
- checkTable(context, opts, table, null);
+ checkTable((ClientContext) client, opts, table, null);
}
}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
index 8506b0e..7ab15f3 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
@@ -23,8 +23,8 @@ import java.io.File;
import java.io.IOException;
import java.util.UUID;
-import org.apache.accumulo.core.clientImpl.AccumuloClientImpl;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.clientImpl.Credentials;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.singletons.SingletonReservation;
@@ -70,8 +70,8 @@ public class SystemCredentialsTest {
/**
* This is a test to ensure the string literal in
- * {@link AccumuloClientImpl#AccumuloClientImpl(SingletonReservation, ClientContext)} is kept
- * up-to-date if we move the {@link SystemToken}<br>
+ * {@link ClientContext#ClientContext(SingletonReservation, ClientInfo)} is kept up-to-date if we
+ * move the {@link SystemToken}<br>
* This check will not be needed after ACCUMULO-1578
*/
@Test
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index eae5969..ef8f486 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -28,7 +28,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
@@ -338,11 +337,9 @@ public class GarbageCollectWriteAheadLogs {
protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates)
throws IOException, KeeperException, InterruptedException {
- AccumuloClient client;
try {
- client = context.getClient();
try {
- final Scanner s = ReplicationTable.getScanner(client);
+ final Scanner s = ReplicationTable.getScanner(context);
StatusSection.limit(s);
for (Entry<Key,Value> entry : s) {
UUID id = path2uuid(new Path(entry.getKey().getRow().toString()));
@@ -353,7 +350,7 @@ public class GarbageCollectWriteAheadLogs {
return candidates.size();
}
- final Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ final Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
scanner.setRange(MetadataSchema.ReplicationSection.getRange());
for (Entry<Key,Value> entry : scanner) {
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 1aac7a2..a078ba7 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -177,8 +177,8 @@ public class SimpleGarbageCollector implements Iface {
return context.getConfiguration();
}
- AccumuloClient getClient() throws AccumuloSecurityException, AccumuloException {
- return context.getClient();
+ AccumuloClient getClient() {
+ return context;
}
/**
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 8aa4f9f..5d217c8 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -86,15 +86,7 @@ public class CloseWriteAheadLogReferences implements Runnable {
// what the version they bundle uses.
Stopwatch sw = Stopwatch.createUnstarted();
- AccumuloClient client;
- try {
- client = context.getClient();
- } catch (Exception e) {
- log.error("Could not create client", e);
- throw new RuntimeException(e);
- }
-
- if (!ReplicationTable.isOnline(client)) {
+ if (!ReplicationTable.isOnline(context)) {
log.debug("Replication table isn't online, not attempting to clean up wals");
return;
}
@@ -116,7 +108,7 @@ public class CloseWriteAheadLogReferences implements Runnable {
long recordsClosed = 0;
try {
sw.start();
- recordsClosed = updateReplicationEntries(client, closed);
+ recordsClosed = updateReplicationEntries(context, closed);
} finally {
sw.stop();
updateReplicationSpan.stop();
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index a188a33..dfca409 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -152,7 +151,6 @@ public class GarbageCollectWriteAheadLogsTest {
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- AccumuloClient client = EasyMock.createMock(AccumuloClient.class);
Scanner mscanner = EasyMock.createMock(Scanner.class);
Scanner rscanner = EasyMock.createMock(Scanner.class);
@@ -160,15 +158,14 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
- EasyMock.expect(context.getClient()).andReturn(client);
- EasyMock.expect(client.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
+ EasyMock.expect(context.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
.andReturn(rscanner);
rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
EasyMock.expectLastCall().once();
EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
- EasyMock.expect(client.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
+ EasyMock.expect(context.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
.andReturn(mscanner);
mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
EasyMock.expectLastCall().once();
@@ -180,7 +177,7 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expectLastCall().once();
marker.forget(server2);
EasyMock.expectLastCall().once();
- EasyMock.replay(context, fs, marker, tserverSet, client, rscanner, mscanner);
+ EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
tserverSet, marker, tabletOnServer1List) {
@Override
@@ -189,7 +186,7 @@ public class GarbageCollectWriteAheadLogsTest {
}
};
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, client, rscanner, mscanner);
+ EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
}
@Test
@@ -198,7 +195,6 @@ public class GarbageCollectWriteAheadLogsTest {
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- AccumuloClient client = EasyMock.createMock(AccumuloClient.class);
Scanner mscanner = EasyMock.createMock(Scanner.class);
Scanner rscanner = EasyMock.createMock(Scanner.class);
@@ -206,22 +202,21 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
- EasyMock.expect(context.getClient()).andReturn(client);
- EasyMock.expect(client.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
+ EasyMock.expect(context.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
.andReturn(rscanner);
rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
EasyMock.expectLastCall().once();
EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
- EasyMock.expect(client.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
+ EasyMock.expect(context.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
.andReturn(mscanner);
mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
EasyMock.expectLastCall().once();
mscanner.setRange(MetadataSchema.ReplicationSection.getRange());
EasyMock.expectLastCall().once();
EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
- EasyMock.replay(context, fs, marker, tserverSet, client, rscanner, mscanner);
+ EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
tserverSet, marker, tabletOnServer2List) {
@Override
@@ -230,7 +225,7 @@ public class GarbageCollectWriteAheadLogsTest {
}
};
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, client, rscanner, mscanner);
+ EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
}
@Test
@@ -239,7 +234,6 @@ public class GarbageCollectWriteAheadLogsTest {
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- AccumuloClient client = EasyMock.createMock(AccumuloClient.class);
Scanner mscanner = EasyMock.createMock(Scanner.class);
Scanner rscanner = EasyMock.createMock(Scanner.class);
String row = MetadataSchema.ReplicationSection.getRowPrefix() + path;
@@ -253,22 +247,21 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.UNREFERENCED, path));
- EasyMock.expect(context.getClient()).andReturn(client);
- EasyMock.expect(client.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
+ EasyMock.expect(context.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
.andReturn(rscanner);
rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
EasyMock.expectLastCall().once();
EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
- EasyMock.expect(client.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
+ EasyMock.expect(context.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
.andReturn(mscanner);
mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
EasyMock.expectLastCall().once();
mscanner.setRange(MetadataSchema.ReplicationSection.getRange());
EasyMock.expectLastCall().once();
EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
- EasyMock.replay(context, fs, marker, tserverSet, client, rscanner, mscanner);
+ EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
tserverSet, marker, tabletOnServer1List) {
@Override
@@ -277,6 +270,6 @@ public class GarbageCollectWriteAheadLogsTest {
}
};
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, client, rscanner, mscanner);
+ EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
}
}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index ef97de1..7fc1639 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -644,10 +644,6 @@ public class Master
return context.getTableManager();
}
- public AccumuloClient getClient() throws AccumuloSecurityException, AccumuloException {
- return context.getClient();
- }
-
public Master(ServerContext context) throws IOException {
this.context = context;
this.serverConfig = context.getServerConfFactory();
@@ -934,7 +930,7 @@ public class Master
if (!migrations.isEmpty()) {
try {
cleanupOfflineMigrations();
- cleanupNonexistentMigrations(context.getClient());
+ cleanupNonexistentMigrations(context);
} catch (Exception ex) {
log.error("Error cleaning up migrations", ex);
}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 16b8b8e..c6b6d55 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -29,8 +29,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
@@ -498,12 +496,7 @@ public class MasterClientServiceHandler extends FateServiceHandler
@Override
public boolean drainReplicationTable(TInfo tfino, TCredentials credentials, String tableName,
Set<String> logsToWatch) throws TException {
- AccumuloClient client;
- try {
- client = master.getClient();
- } catch (AccumuloException | AccumuloSecurityException e) {
- throw new RuntimeException("Failed to obtain client", e);
- }
+ AccumuloClient client = master.getContext();
final Text tableId = new Text(getTableId(master.getContext(), tableName).getUtf8());
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 9cb447c..cea3618 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -431,7 +430,7 @@ abstract class TabletGroupWatcher extends Daemon {
String table = MetadataTable.NAME;
if (extent.isMeta())
table = RootTable.NAME;
- Scanner scanner = this.master.getClient().createScanner(table, Authorizations.EMPTY);
+ Scanner scanner = this.master.getContext().createScanner(table, Authorizations.EMPTY);
scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
scanner.setRange(new Range(row));
@@ -460,7 +459,7 @@ abstract class TabletGroupWatcher extends Daemon {
TServerInstance alive = master.tserverSet.find(entry.getValue().toString());
if (alive == null) {
Master.log.info("Removing entry {}", entry);
- BatchWriter bw = this.master.getClient().createBatchWriter(table,
+ BatchWriter bw = this.master.getContext().createBatchWriter(table,
new BatchWriterConfig());
Mutation m = new Mutation(entry.getKey().getRow());
m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
@@ -555,7 +554,7 @@ abstract class TabletGroupWatcher extends Daemon {
private void updateMergeState(Map<Table.ID,MergeStats> mergeStatsCache) {
for (MergeStats stats : mergeStatsCache.values()) {
try {
- MergeState update = stats.nextMergeState(this.master.getClient(), this.master);
+ MergeState update = stats.nextMergeState(this.master.getContext(), this.master);
// when next state is MERGING, its important to persist this before
// starting the merge... the verification check that is done before
// moving into the merging state could fail if merge starts but does
@@ -598,7 +597,7 @@ abstract class TabletGroupWatcher extends Daemon {
Master.log.debug("Found following tablet {}", followingTablet);
}
try {
- AccumuloClient client = this.master.getClient();
+ AccumuloClient client = this.master.getContext();
Text start = extent.getPrevEndRow();
if (start == null) {
start = new Text();
@@ -677,8 +676,7 @@ abstract class TabletGroupWatcher extends Daemon {
new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir,
master.getContext(), timeType, this.master.masterLock);
}
- } catch (RuntimeException | IOException | TableNotFoundException
- | AccumuloSecurityException ex) {
+ } catch (RuntimeException | IOException | TableNotFoundException ex) {
throw new AccumuloException(ex);
}
}
@@ -704,7 +702,7 @@ abstract class TabletGroupWatcher extends Daemon {
BatchWriter bw = null;
try {
long fileCount = 0;
- AccumuloClient client = this.master.getClient();
+ AccumuloClient client = this.master.getContext();
// Make file entries in highest tablet
bw = client.createBatchWriter(targetSystemTable, new BatchWriterConfig());
Scanner scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY);
@@ -820,7 +818,7 @@ abstract class TabletGroupWatcher extends Daemon {
private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
try {
- AccumuloClient client = this.master.getClient();
+ AccumuloClient client = this.master.getContext();
Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME,
Authorizations.EMPTY);
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index 33479a5..d2d2bc1 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -17,8 +17,6 @@
package org.apache.accumulo.master.replication;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.trace.Trace;
@@ -65,15 +63,7 @@ public class ReplicationDriver extends Daemon {
while (master.stillMaster()) {
if (null == workMaker) {
- try {
- client = master.getClient();
- } catch (AccumuloException | AccumuloSecurityException e) {
- // couldn't get a client, try again in a "short" amount of time
- log.warn("Error trying to get client to process replication records", e);
- UtilWaitThread.sleep(2000);
- continue;
- }
-
+ client = master.getContext();
statusMaker = new StatusMaker(client, master.getFileSystem());
workMaker = new WorkMaker(master.getContext(), client);
finishedWorkUpdater = new FinishedWorkUpdater(client);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
index e9e592f..b9e9e60 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
@@ -47,7 +47,7 @@ public class WorkDriver extends Daemon {
public WorkDriver(Master master) throws AccumuloException, AccumuloSecurityException {
super();
this.master = master;
- this.client = master.getClient();
+ this.client = master.getContext();
this.conf = master.getConfiguration();
configureWorkAssigner();
}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java
index 493666f..df3d5bc 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java
@@ -57,7 +57,7 @@ public class CleanUpBulkImport extends MasterRepo {
"/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
MetadataTableUtil.addDeleteEntry(master.getContext(), tableId, bulkDir.toString());
log.debug("removing the metadata table markers for loaded files");
- AccumuloClient client = master.getClient();
+ AccumuloClient client = master.getContext();
MetadataTableUtil.removeBulkLoadEntries(client, tableId, tid);
log.debug("releasing HDFS reservations for " + source + " and " + error);
Utils.unreserveHdfsDirectory(master, source, tid);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java
index b5cad58..191869c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java
@@ -114,7 +114,7 @@ class CopyFailed extends MasterRepo {
*/
// determine which failed files were loaded
- AccumuloClient client = master.getClient();
+ AccumuloClient client = master.getContext();
try (Scanner mscanner = new IsolatedScanner(
client.createScanner(MetadataTable.NAME, Authorizations.EMPTY))) {
mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
index 3050575..56fb73c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -52,7 +52,7 @@ public class CleanUpBulkImport extends MasterRepo {
MetadataTableUtil.addDeleteEntry(master.getContext(), info.tableId, bulkDir.toString());
if (info.tableState == TableState.ONLINE) {
log.debug("removing the metadata table markers for loaded files");
- AccumuloClient client = master.getClient();
+ AccumuloClient client = master.getContext();
MetadataTableUtil.removeBulkLoadEntries(client, info.tableId, tid);
}
Utils.unreserveHdfsDirectory(master, info.sourceDir, tid);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
index a89799a..63a46f4 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
@@ -216,7 +216,7 @@ class LoadFiles extends MasterRepo {
void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
Preconditions.checkArgument(!setTime);
super.start(bulkDir, master, tid, setTime);
- bw = master.getClient().createBatchWriter(MetadataTable.NAME);
+ bw = master.getContext().createBatchWriter(MetadataTable.NAME);
unloadingTablets = new MapCounter<>();
}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/create/PopulateMetadata.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/create/PopulateMetadata.java
index 8930136..26c0b7e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/create/PopulateMetadata.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/create/PopulateMetadata.java
@@ -69,7 +69,7 @@ class PopulateMetadata extends MasterRepo {
SortedSet<Text> dirs = Utils
.getSortedSetFromFile(environment.getInputStream(tableInfo.getSplitDirsFile()), false);
Map<Text,Text> splitDirMap = createSplitDirectoryMap(splits, dirs);
- try (BatchWriter bw = environment.getClient().createBatchWriter("accumulo.metadata")) {
+ try (BatchWriter bw = environment.getContext().createBatchWriter("accumulo.metadata")) {
writeSplitsToMetadataTable(environment.getContext(), tableInfo.getTableId(), splits,
splitDirMap, tableInfo.getTimeType(), environment.getMasterLock(), bw);
}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/delete/CleanUp.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/delete/CleanUp.java
index d71d3dd..7999f41 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/delete/CleanUp.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/delete/CleanUp.java
@@ -92,7 +92,7 @@ class CleanUp extends MasterRepo {
boolean done = true;
Range tableRange = new KeyExtent(tableId, null, null).toMetadataRange();
- Scanner scanner = master.getClient().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Scanner scanner = master.getContext().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
MetaDataTableScanner.configureScanner(scanner, master);
scanner.setRange(tableRange);
@@ -125,7 +125,7 @@ class CleanUp extends MasterRepo {
try {
// look for other tables that references this table's files
- AccumuloClient client = master.getClient();
+ AccumuloClient client = master.getContext();
try (BatchScanner bs = client.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY,
8)) {
Range allTables = MetadataSchema.TabletsSection.getRange();
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableExport/WriteExportFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableExport/WriteExportFiles.java
index 1aeb4a0..68af2fb 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableExport/WriteExportFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableExport/WriteExportFiles.java
@@ -92,7 +92,7 @@ class WriteExportFiles extends MasterRepo {
if (reserved > 0)
return reserved;
- AccumuloClient client = master.getClient();
+ AccumuloClient client = master.getContext();
checkOffline(master.getContext());
@@ -213,13 +213,12 @@ class WriteExportFiles extends MasterRepo {
private static Map<String,String> exportMetadata(VolumeManager fs, ServerContext context,
Table.ID tableID, ZipOutputStream zipOut, DataOutputStream dataOut)
- throws IOException, TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ throws IOException, TableNotFoundException {
zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
Map<String,String> uniqueFiles = new HashMap<>();
- Scanner metaScanner = context.getClient().createScanner(MetadataTable.NAME,
- Authorizations.EMPTY);
+ Scanner metaScanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(metaScanner);
TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(metaScanner);
@@ -254,13 +253,11 @@ class WriteExportFiles extends MasterRepo {
}
private static void exportConfig(ServerContext context, Table.ID tableID, ZipOutputStream zipOut,
- DataOutputStream dataOut)
- throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException {
- AccumuloClient client = context.getClient();
+ DataOutputStream dataOut) throws AccumuloException, AccumuloSecurityException, IOException {
DefaultConfiguration defaultConfig = DefaultConfiguration.getInstance();
- Map<String,String> siteConfig = client.instanceOperations().getSiteConfiguration();
- Map<String,String> systemConfig = client.instanceOperations().getSystemConfiguration();
+ Map<String,String> siteConfig = context.instanceOperations().getSiteConfiguration();
+ Map<String,String> systemConfig = context.instanceOperations().getSystemConfiguration();
TableConfiguration tableConfig = context.getServerConfFactory().getTableConfiguration(tableID);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/PopulateMetadataTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/PopulateMetadataTable.java
index 86c172f..d1a3fa3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/PopulateMetadataTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/PopulateMetadataTable.java
@@ -94,7 +94,7 @@ class PopulateMetadataTable extends MasterRepo {
try {
VolumeManager fs = master.getFileSystem();
- mbw = master.getClient().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ mbw = master.getContext().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
zis = new ZipInputStream(fs.open(path));
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 1f58fcd..65944d7 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.clientImpl.MasterClient;
import org.apache.accumulo.core.clientImpl.Table;
import org.apache.accumulo.core.conf.Property;
@@ -595,8 +594,7 @@ public class Monitor implements HighlyAvailableService {
public static void fetchScans() throws Exception {
if (context == null)
return;
- AccumuloClient c = context.getClient();
- for (String server : c.instanceOperations().getTabletServers()) {
+ for (String server : context.instanceOperations().getTabletServers()) {
final HostAndPort parsedServer = HostAndPort.fromString(server);
Client tserver = ThriftUtil.getTServerClient(parsedServer, context);
try {
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/replication/ReplicationResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/replication/ReplicationResource.java
index 9ff8459..03f134a 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/replication/ReplicationResource.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/replication/ReplicationResource.java
@@ -74,7 +74,7 @@ public class ReplicationResource {
@GET
public List<ReplicationInformation> getReplicationInformation()
throws AccumuloException, AccumuloSecurityException {
- final AccumuloClient client = Monitor.getContext().getClient();
+ final AccumuloClient client = Monitor.getContext();
final TableOperations tops = client.tableOperations();
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/trace/TracesResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/trace/TracesResource.java
index e5c8197..76ee863 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/trace/TracesResource.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/trace/TracesResource.java
@@ -23,7 +23,6 @@ import static org.apache.accumulo.monitor.util.ParameterValidator.ALPHA_NUM_REGE
import java.io.IOException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -40,6 +39,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -58,7 +58,6 @@ import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.tracer.SpanTree;
-import org.apache.accumulo.tracer.SpanTreeVisitor;
import org.apache.accumulo.tracer.TraceDump;
import org.apache.accumulo.tracer.TraceFormatter;
import org.apache.accumulo.tracer.thrift.Annotation;
@@ -91,34 +90,39 @@ public class TracesResource {
RecentTracesList recentTraces = new RecentTracesList();
- Pair<Scanner,UserGroupInformation> pair = getScanner();
- final Scanner scanner = pair.getFirst();
- if (scanner == null) {
+ Pair<AccumuloClient,UserGroupInformation> pair = getClient();
+ AccumuloClient client = pair.getFirst();
+ if (client == null) {
return recentTraces;
}
+ try {
+ final Scanner scanner = getScanner(client);
+ if (scanner == null) {
+ return recentTraces;
+ }
- Range range = getRangeForTrace(minutes);
- scanner.setRange(range);
+ Range range = getRangeForTrace(minutes);
+ scanner.setRange(range);
- final Map<String,RecentTracesInformation> summary = new TreeMap<>();
- if (null != pair.getSecond()) {
- pair.getSecond().doAs(new PrivilegedAction<Void>() {
- @Override
- public Void run() {
+ final Map<String,RecentTracesInformation> summary = new TreeMap<>();
+ if (null != pair.getSecond()) {
+ pair.getSecond().doAs((PrivilegedAction<Void>) () -> {
parseSpans(scanner, summary);
return null;
- }
- });
- } else {
- parseSpans(scanner, summary);
- }
+ });
+ } else {
+ parseSpans(scanner, summary);
+ }
- // Adds the traces to the list
- for (Entry<String,RecentTracesInformation> entry : summary.entrySet()) {
- RecentTracesInformation stat = entry.getValue();
- recentTraces.addTrace(stat);
+ // Adds the traces to the list
+ for (Entry<String,RecentTracesInformation> entry : summary.entrySet()) {
+ RecentTracesInformation stat = entry.getValue();
+ recentTraces.addTrace(stat);
+ }
+ return recentTraces;
+ } finally {
+ client.close();
}
- return recentTraces;
}
/**
@@ -138,20 +142,23 @@ public class TracesResource {
TraceType typeTraces = new TraceType(type);
- Pair<Scanner,UserGroupInformation> pair = getScanner();
- final Scanner scanner = pair.getFirst();
- if (scanner == null) {
+ Pair<AccumuloClient,UserGroupInformation> pair = getClient();
+ AccumuloClient client = pair.getFirst();
+ if (client == null) {
return typeTraces;
}
+ try {
+ final Scanner scanner = getScanner(client);
+ if (scanner == null) {
+ return typeTraces;
+ }
- Range range = getRangeForTrace(minutes);
+ Range range = getRangeForTrace(minutes);
- scanner.setRange(range);
+ scanner.setRange(range);
- if (null != pair.getSecond()) {
- pair.getSecond().doAs(new PrivilegedAction<Void>() {
- @Override
- public Void run() {
+ if (null != pair.getSecond()) {
+ pair.getSecond().doAs((PrivilegedAction<Void>) () -> {
for (Entry<Key,Value> entry : scanner) {
RemoteSpan span = TraceFormatter.getRemoteSpan(entry);
@@ -160,17 +167,19 @@ public class TracesResource {
}
}
return null;
- }
- });
- } else {
- for (Entry<Key,Value> entry : scanner) {
- RemoteSpan span = TraceFormatter.getRemoteSpan(entry);
- if (span.description.equals(type)) {
- typeTraces.addTrace(new TracesForTypeInformation(span));
+ });
+ } else {
+ for (Entry<Key,Value> entry : scanner) {
+ RemoteSpan span = TraceFormatter.getRemoteSpan(entry);
+ if (span.description.equals(type)) {
+ typeTraces.addTrace(new TracesForTypeInformation(span));
+ }
}
}
+ return typeTraces;
+ } finally {
+ client.close();
}
- return typeTraces;
}
/**
@@ -187,45 +196,45 @@ public class TracesResource {
TraceList traces = new TraceList(id);
- Pair<Scanner,UserGroupInformation> entry = getScanner();
- final Scanner scanner = entry.getFirst();
- if (scanner == null) {
+ Pair<AccumuloClient,UserGroupInformation> pair = getClient();
+ AccumuloClient client = pair.getFirst();
+ if (client == null) {
return traces;
}
+ try {
+ final Scanner scanner = getScanner(client);
- Range range = new Range(new Text(id));
- scanner.setRange(range);
- final SpanTree tree = new SpanTree();
- long start;
-
- if (null != entry.getSecond()) {
- start = entry.getSecond().doAs(new PrivilegedAction<Long>() {
- @Override
- public Long run() {
- return addSpans(scanner, tree, Long.MAX_VALUE);
- }
- });
- } else {
- start = addSpans(scanner, tree, Long.MAX_VALUE);
- }
+ if (scanner == null) {
+ return traces;
+ }
- traces.addStartTime(start);
+ Range range = new Range(new Text(id));
+ scanner.setRange(range);
+ final SpanTree tree = new SpanTree();
+ long start;
- final long finalStart = start;
- Set<Long> visited = tree.visit(new SpanTreeVisitor() {
- @Override
- public void visit(int level, RemoteSpan parent, RemoteSpan node,
- Collection<RemoteSpan> children) {
- traces.addTrace(addTraceInformation(level, node, finalStart));
+ if (null != pair.getSecond()) {
+ start = pair.getSecond()
+ .doAs((PrivilegedAction<Long>) () -> addSpans(scanner, tree, Long.MAX_VALUE));
+ } else {
+ start = addSpans(scanner, tree, Long.MAX_VALUE);
}
- });
- tree.nodes.keySet().removeAll(visited);
- if (!tree.nodes.isEmpty()) {
- for (RemoteSpan span : TraceDump.sortByStart(tree.nodes.values())) {
- traces.addTrace(addTraceInformation(0, span, finalStart));
+
+ traces.addStartTime(start);
+
+ final long finalStart = start;
+ Set<Long> visited = tree.visit((level, parent, node, children) -> traces
+ .addTrace(addTraceInformation(level, node, finalStart)));
+ tree.nodes.keySet().removeAll(visited);
+ if (!tree.nodes.isEmpty()) {
+ for (RemoteSpan span : TraceDump.sortByStart(tree.nodes.values())) {
+ traces.addTrace(addTraceInformation(0, span, finalStart));
+ }
}
+ return traces;
+ } finally {
+ client.close();
}
- return traces;
}
private static TraceInformation addTraceInformation(int level, RemoteSpan node, long finalStart) {
@@ -284,8 +293,7 @@ public class TracesResource {
}
}
- protected Pair<Scanner,UserGroupInformation> getScanner()
- throws AccumuloException, AccumuloSecurityException {
+ protected Pair<AccumuloClient,UserGroupInformation> getClient() {
AccumuloConfiguration conf = Monitor.getContext().getConfiguration();
final boolean saslEnabled = conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED);
UserGroupInformation traceUgi = null;
@@ -330,17 +338,14 @@ public class TracesResource {
at = null;
}
- final String table = conf.get(Property.TRACE_TABLE);
- Scanner scanner;
+ java.util.Properties props = Monitor.getContext().getProperties();
+ AccumuloClient client;
if (null != traceUgi) {
try {
- scanner = traceUgi.doAs((PrivilegedExceptionAction<Scanner>) () -> {
+ client = traceUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () -> {
// Make the KerberosToken inside the doAs
- AuthenticationToken token = at;
- if (null == token) {
- token = new KerberosToken();
- }
- return getScanner(table, principal, token);
+ AuthenticationToken token = new KerberosToken();
+ return Accumulo.newClient().from(props).as(principal, token).build();
});
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to obtain scanner", e);
@@ -349,21 +354,19 @@ public class TracesResource {
if (null == at) {
throw new AssertionError("AuthenticationToken should not be null");
}
- scanner = getScanner(table, principal, at);
+ client = Accumulo.newClient().from(props).as(principal, at).build();
}
-
- return new Pair<>(scanner, traceUgi);
+ return new Pair<>(client, traceUgi);
}
- private Scanner getScanner(String table, String principal, AuthenticationToken at)
- throws AccumuloException, AccumuloSecurityException {
+ private Scanner getScanner(AccumuloClient client) throws AccumuloException {
try {
- AccumuloClient client = Monitor.getContext().getClient(principal, at);
+ AccumuloConfiguration conf = Monitor.getContext().getConfiguration();
+ final String table = conf.get(Property.TRACE_TABLE);
if (!client.tableOperations().exists(table)) {
return null;
}
- return client.createScanner(table,
- client.securityOperations().getUserAuthorizations(principal));
+ return client.createScanner(table);
} catch (AccumuloSecurityException | TableNotFoundException ex) {
return null;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
index 6728cab..f707260 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
@@ -83,7 +83,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
BatchWriterConfig bwConfig = new BatchWriterConfig();
bwConfig.setMaxMemory(memoryInBytes);
try {
- bw = context.getClient().createBatchWriter(tableName, bwConfig);
+ bw = context.createBatchWriter(tableName, bwConfig);
} catch (TableNotFoundException e) {
throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST,
"Table " + tableName + " does not exist");
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 10635d3..7498520 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.NoSuchElementException;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -101,7 +99,7 @@ public class ReplicationProcessor implements Processor {
Status status;
try {
status = getStatus(file, target);
- } catch (ReplicationTableOfflineException | AccumuloException | AccumuloSecurityException e) {
+ } catch (ReplicationTableOfflineException e) {
log.error("Could not look for replication record", e);
throw new IllegalStateException("Could not look for replication record", e);
} catch (InvalidProtocolBufferException e) {
@@ -178,12 +176,10 @@ public class ReplicationProcessor implements Processor {
}
protected Status getStatus(String file, ReplicationTarget target)
- throws ReplicationTableOfflineException, AccumuloException, AccumuloSecurityException,
- InvalidProtocolBufferException {
- Scanner s = ReplicationTable.getScanner(context.getClient());
+ throws ReplicationTableOfflineException, InvalidProtocolBufferException {
+ Scanner s = ReplicationTable.getScanner(context);
s.setRange(Range.exact(file));
s.fetchColumn(WorkSection.NAME, target.toText());
-
return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
}
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
index d7adcfa..5cc6c8f 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
@@ -27,8 +27,6 @@ import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -57,19 +55,16 @@ public class BatchWriterReplicationReplayerTest {
private BatchWriter bw;
@Before
- public void setUpContext() throws AccumuloException, AccumuloSecurityException {
- client = createMock(AccumuloClient.class);
+ public void setUpContext() {
conf = createMock(AccumuloConfiguration.class);
bw = createMock(BatchWriter.class);
context = createMock(ClientContext.class);
expect(context.getConfiguration()).andReturn(conf).anyTimes();
- expect(context.getClient()).andReturn(client).anyTimes();
- replay(context);
}
@After
public void verifyMock() {
- verify(context, client, conf, bw);
+ verify(context, conf, bw);
}
@Test
@@ -124,7 +119,7 @@ public class BatchWriterReplicationReplayerTest {
expect(conf.getAsBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY))
.andReturn(bwCfg.getMaxMemory());
- expect(client.createBatchWriter(tableName, bwCfg)).andReturn(bw);
+ expect(context.createBatchWriter(tableName, bwCfg)).andReturn(bw);
bw.addMutations(Lists.newArrayList(expectedMutation));
expectLastCall().once();
@@ -132,7 +127,7 @@ public class BatchWriterReplicationReplayerTest {
bw.close();
expectLastCall().once();
- replay(client, conf, bw);
+ replay(context, conf, bw);
replayer.replicateLog(context, tableName, edits);
}
@@ -196,7 +191,7 @@ public class BatchWriterReplicationReplayerTest {
expect(conf.getAsBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY))
.andReturn(bwCfg.getMaxMemory());
- expect(client.createBatchWriter(tableName, bwCfg)).andReturn(bw);
+ expect(context.createBatchWriter(tableName, bwCfg)).andReturn(bw);
bw.addMutations(Lists.newArrayList(expectedMutation));
expectLastCall().once();
@@ -204,7 +199,7 @@ public class BatchWriterReplicationReplayerTest {
bw.close();
expectLastCall().once();
- replay(client, conf, bw);
+ replay(context, conf, bw);
replayer.replicateLog(context, tableName, edits);
}
diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
index bab0659..83eb1193 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
@@ -342,7 +342,7 @@ public class Shell extends ShellOptions implements KeywordExecutable {
clientProperties);
this.setTableName("");
accumuloClient = Accumulo.newClient().from(clientProperties).as(principal, token).build();
- context = new ClientContext(accumuloClient);
+ context = (ClientContext) accumuloClient;
} catch (Exception e) {
printException(e);
exitCode = 1;
@@ -1178,7 +1178,7 @@ public class Shell extends ShellOptions implements KeywordExecutable {
}
accumuloClient = Accumulo.newClient().from(clientProperties).as(principal, token).build();
accumuloClient.securityOperations().authenticateUser(principal, token);
- context = new ClientContext(accumuloClient);
+ context = (ClientContext) accumuloClient;
}
public ClientContext getContext() {
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/CreateUserCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/CreateUserCommand.java
index b6a21a3..707835a 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/CreateUserCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/CreateUserCommand.java
@@ -25,7 +25,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.clientImpl.AccumuloClientImpl;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.Shell.Command;
import org.apache.commons.cli.CommandLine;
@@ -38,7 +38,7 @@ public class CreateUserCommand extends Command {
TableExistsException, IOException {
final String user = cl.getArgs()[0];
- AuthenticationToken userToken = ((AccumuloClientImpl) shellState.getAccumuloClient()).token();
+ AuthenticationToken userToken = ((ClientContext) shellState.getAccumuloClient()).token();
PasswordToken passwordToken;
if (userToken instanceof KerberosToken) {
passwordToken = new PasswordToken();
diff --git a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
index 2ec891a..831a6ee 100644
--- a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
+++ b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.security.TablePermission;
@@ -67,7 +66,7 @@ public abstract class AccumuloClusterHarness extends AccumuloITBase
private static final Logger log = LoggerFactory.getLogger(AccumuloClusterHarness.class);
private static final String TRUE = Boolean.toString(true);
- public static enum ClusterType {
+ public enum ClusterType {
MINI, STANDALONE;
public boolean isDynamic() {
@@ -282,11 +281,6 @@ public abstract class AccumuloClusterHarness extends AccumuloITBase
return ClientInfo.from(getCluster().getClientProperties());
}
- public static ClientContext getClientContext() {
- checkState(initialized);
- return new ClientContext(getClientInfo());
- }
-
public static ServerContext getServerContext() {
return getCluster().getServerContext();
}
@@ -354,6 +348,7 @@ public abstract class AccumuloClusterHarness extends AccumuloITBase
}
public AccumuloClient createAccumuloClient() {
+ checkState(initialized);
try {
String princ = getAdminPrincipal();
AuthenticationToken token = getAdminToken();
diff --git a/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
index faaf602..bf5ab31 100644
--- a/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
@@ -235,6 +235,8 @@ public class BatchWriterIterator extends WrappingIterator {
batchWriter.close();
} catch (MutationsRejectedException e) {
log.error("Failed to close BatchWriter; some mutations may not be applied", e);
+ } finally {
+ accumuloClient.close();
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java b/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
index b268d5b..5fc1a33 100644
--- a/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
@@ -86,7 +86,7 @@ public class DetectDeadTabletServersIT extends ConfigurableMacBase {
}
private MasterMonitorInfo getStats(AccumuloClient c) throws Exception {
- ClientContext context = getClientContext();
+ ClientContext context = (ClientContext) c;
Client client = null;
while (true) {
try {
diff --git a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
index cac4b63..80faa0e 100644
--- a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
@@ -70,7 +70,7 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase {
public void test() throws Exception {
// make some tablets, spread 'em around
try (AccumuloClient c = createClient()) {
- ClientContext context = getClientContext();
+ ClientContext context = (ClientContext) c;
String table = this.getUniqueNames(1)[0];
c.securityOperations().grantTablePermission("root", MetadataTable.NAME,
TablePermission.WRITE);
diff --git a/test/src/main/java/org/apache/accumulo/test/SampleIT.java b/test/src/main/java/org/apache/accumulo/test/SampleIT.java
index c4bd7b9..60ed7ce 100644
--- a/test/src/main/java/org/apache/accumulo/test/SampleIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/SampleIT.java
@@ -216,8 +216,8 @@ public class SampleIT extends AccumuloClusterHarness {
client.tableOperations().clone(tableName, clone, false, em, es);
client.tableOperations().offline(clone, true);
Table.ID cloneID = Table.ID.of(client.tableOperations().tableIdMap().get(clone));
- ClientContext context = new ClientContext(client);
- OfflineScanner oScanner = new OfflineScanner(context, cloneID, Authorizations.EMPTY);
+ OfflineScanner oScanner = new OfflineScanner((ClientContext) client, cloneID,
+ Authorizations.EMPTY);
if (sc != null) {
oScanner.setSamplerConfiguration(sc);
}
diff --git a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
index 259ff58..ce48f2d 100644
--- a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -48,6 +48,7 @@ import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
import org.apache.accumulo.core.data.Key;
@@ -71,7 +72,6 @@ import com.google.common.collect.Sets;
public class TableOperationsIT extends AccumuloClusterHarness {
static TabletClientService.Client client;
-
private AccumuloClient accumuloClient;
@Override
@@ -80,13 +80,13 @@ public class TableOperationsIT extends AccumuloClusterHarness {
}
@Before
- public void setup() throws Exception {
+ public void setup() {
accumuloClient = createAccumuloClient();
}
@After
public void checkForDanglingFateLocks() {
- FunctionalTestUtils.assertNoDanglingFateLocks(getClientContext(), getCluster());
+ FunctionalTestUtils.assertNoDanglingFateLocks((ClientContext) accumuloClient, getCluster());
accumuloClient.close();
}
diff --git a/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java b/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
index 097efd7..5bfead7 100644
--- a/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
@@ -45,8 +45,10 @@ public class TabletServerHdfsRestartIT extends ConfigurableMacBase {
@Test(timeout = 2 * 60 * 1000)
public void test() throws Exception {
try (AccumuloClient client = this.createClient()) {
- // Yes, there's a tabletserver
- assertEquals(1, client.instanceOperations().getTabletServers().size());
+ // wait until a tablet server is up
+ while (client.instanceOperations().getTabletServers().isEmpty()) {
+ Thread.sleep(50);
+ }
final String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
BatchWriter bw = client.createBatchWriter(tableName, null);
diff --git a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
index a159242..2bb5b84 100644
--- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ThriftTransportKey;
import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
@@ -49,73 +50,79 @@ public class TransportCachingIT extends AccumuloClusterHarness {
@Test
public void testCachedTransport() throws InterruptedException {
- ClientContext context = getClientContext();
- long rpcTimeout = ConfigurationTypeHelper
- .getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue());
+ try (AccumuloClient client = createAccumuloClient()) {
+ while (client.instanceOperations().getTabletServers().isEmpty()) {
+ // sleep until a tablet server is up
+ Thread.sleep(50);
+ }
+ ClientContext context = (ClientContext) client;
+ long rpcTimeout = ConfigurationTypeHelper
+ .getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue());
- ZooCache zc = context.getZooCache();
- final String zkRoot = context.getZooKeeperRoot();
+ ZooCache zc = context.getZooCache();
+ final String zkRoot = context.getZooKeeperRoot();
- // wait until Zookeeper is populated
- List<String> children = zc.getChildren(zkRoot + Constants.ZTSERVERS);
- while (children.isEmpty()) {
- Thread.sleep(100);
- children = zc.getChildren(zkRoot + Constants.ZTSERVERS);
- }
+ // wait until Zookeeper is populated
+ List<String> children = zc.getChildren(zkRoot + Constants.ZTSERVERS);
+ while (children.isEmpty()) {
+ Thread.sleep(100);
+ children = zc.getChildren(zkRoot + Constants.ZTSERVERS);
+ }
- ArrayList<ThriftTransportKey> servers = new ArrayList<>();
- for (String tserver : children) {
- String path = zkRoot + Constants.ZTSERVERS + "/" + tserver;
- byte[] data = ZooUtil.getLockData(zc, path);
- if (data != null) {
- String strData = new String(data, UTF_8);
- if (!strData.equals("master"))
- servers.add(new ThriftTransportKey(
- new ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, context));
+ ArrayList<ThriftTransportKey> servers = new ArrayList<>();
+ for (String tserver : children) {
+ String path = zkRoot + Constants.ZTSERVERS + "/" + tserver;
+ byte[] data = ZooUtil.getLockData(zc, path);
+ if (data != null) {
+ String strData = new String(data, UTF_8);
+ if (!strData.equals("master"))
+ servers.add(new ThriftTransportKey(
+ new ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, context));
+ }
}
- }
- ThriftTransportPool pool = ThriftTransportPool.getInstance();
- TTransport first = null;
- while (null == first) {
- try {
- // Get a transport (cached or not)
- first = pool.getAnyTransport(servers, true).getSecond();
- } catch (TTransportException e) {
- log.warn("Failed to obtain transport to {}", servers);
+ ThriftTransportPool pool = ThriftTransportPool.getInstance();
+ TTransport first = null;
+ while (null == first) {
+ try {
+ // Get a transport (cached or not)
+ first = pool.getAnyTransport(servers, true).getSecond();
+ } catch (TTransportException e) {
+ log.warn("Failed to obtain transport to {}", servers);
+ }
}
- }
- assertNotNull(first);
- // Return it to unreserve it
- pool.returnTransport(first);
+ assertNotNull(first);
+ // Return it to unreserve it
+ pool.returnTransport(first);
- TTransport second = null;
- while (null == second) {
- try {
- // Get a cached transport (should be the first)
- second = pool.getAnyTransport(servers, true).getSecond();
- } catch (TTransportException e) {
- log.warn("Failed obtain 2nd transport to {}", servers);
+ TTransport second = null;
+ while (null == second) {
+ try {
+ // Get a cached transport (should be the first)
+ second = pool.getAnyTransport(servers, true).getSecond();
+ } catch (TTransportException e) {
+ log.warn("Failed obtain 2nd transport to {}", servers);
+ }
}
- }
- // We should get the same transport
- assertSame("Expected the first and second to be the same instance", first, second);
- // Return the 2nd
- pool.returnTransport(second);
+ // We should get the same transport
+ assertSame("Expected the first and second to be the same instance", first, second);
+ // Return the 2nd
+ pool.returnTransport(second);
- TTransport third = null;
- while (null == third) {
- try {
- // Get a non-cached transport
- third = pool.getAnyTransport(servers, false).getSecond();
- } catch (TTransportException e) {
- log.warn("Failed obtain 2nd transport to {}", servers);
+ TTransport third = null;
+ while (null == third) {
+ try {
+ // Get a non-cached transport
+ third = pool.getAnyTransport(servers, false).getSecond();
+ } catch (TTransportException e) {
+ log.warn("Failed obtain 2nd transport to {}", servers);
+ }
}
- }
- assertNotSame("Expected second and third transport to be different instances", second, third);
- pool.returnTransport(third);
+ assertNotSame("Expected second and third transport to be different instances", second, third);
+ pool.returnTransport(third);
+ }
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
index 714bb4f..1f7aeef 100644
--- a/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -72,10 +73,9 @@ public class UserCompactionStrategyIT extends AccumuloClusterHarness {
@After
public void checkForDanglingFateLocks() {
- // create an accumulo client even though it's not used in order to enable static stuff
try (AccumuloClient c = createAccumuloClient()) {
assertNotNull(c);
- FunctionalTestUtils.assertNoDanglingFateLocks(getClientContext(), getCluster());
+ FunctionalTestUtils.assertNoDanglingFateLocks((ClientContext) c, getCluster());
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
index e4862a7..a078981 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
@@ -102,7 +102,7 @@ public class BalanceAfterCommsFailureIT extends ConfigurableMacBase {
}
private void checkBalance(AccumuloClient c) throws Exception {
- ClientContext context = getClientContext();
+ ClientContext context = (ClientContext) c;
MasterMonitorInfo stats = null;
int unassignedTablets = 1;
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
index 15d9be0..3e51c0f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Credentials;
import org.apache.accumulo.core.clientImpl.MasterClient;
import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
@@ -156,7 +157,7 @@ public class BalanceInPresenceOfOfflineTableIT extends AccumuloClusterHarness {
MasterMonitorInfo stats;
while (true) {
try {
- client = MasterClient.getConnectionWithRetry(getClientContext());
+ client = MasterClient.getConnectionWithRetry((ClientContext) accumuloClient);
stats = client.getMasterStats(Tracer.traceInfo(),
creds.toThrift(accumuloClient.getInstanceID()));
break;
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index ab0ef6f..782a2b5 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -105,7 +106,7 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterHarness {
// expected
}
- FunctionalTestUtils.assertNoDanglingFateLocks(getClientContext(), getCluster());
+ FunctionalTestUtils.assertNoDanglingFateLocks((ClientContext) c, getCluster());
}
es.shutdown();
@@ -253,7 +254,7 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterHarness {
// expected
}
- FunctionalTestUtils.assertNoDanglingFateLocks(getClientContext(), getCluster());
+ FunctionalTestUtils.assertNoDanglingFateLocks((ClientContext) c, getCluster());
}
es.shutdown();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
index f34604d..40b5474 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java
@@ -28,15 +28,11 @@ import java.util.Properties;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.MonitorUtil;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.harness.AccumuloITBase;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
@@ -49,7 +45,6 @@ import org.apache.accumulo.test.util.CertUtils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.experimental.categories.Category;
@@ -199,10 +194,6 @@ public class ConfigurableMacBase extends AccumuloITBase {
return getCluster().createAccumuloClient("root", new PasswordToken(ROOT_PASSWORD));
}
- protected ClientContext getClientContext() {
- return new ClientContext(getClientInfo());
- }
-
protected Properties getClientProperties() {
return getClientInfo().getProperties();
}
@@ -226,9 +217,4 @@ public class ConfigurableMacBase extends AccumuloITBase {
protected Process exec(Class<?> clazz, String... args) throws IOException {
return getCluster().exec(clazz, args);
}
-
- protected String getMonitor()
- throws KeeperException, InterruptedException, AccumuloSecurityException, AccumuloException {
- return MonitorUtil.getLocation(getClientContext());
- }
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
index cf6cce2..281dcf3 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Credentials;
import org.apache.accumulo.core.clientImpl.MasterClient;
import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
@@ -103,7 +104,7 @@ public class DynamicThreadPoolsIT extends AccumuloClusterHarness {
MasterMonitorInfo stats = null;
while (true) {
try {
- client = MasterClient.getConnectionWithRetry(getClientContext());
+ client = MasterClient.getConnectionWithRetry((ClientContext) c);
stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstanceID()));
break;
} catch (ThriftNotActiveServiceException e) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
index 85db23b..949f598 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
@@ -23,6 +23,7 @@ import java.util.Random;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.TestIngest;
import org.apache.hadoop.io.Text;
@@ -70,7 +71,7 @@ public class FateStarvationIT extends AccumuloClusterHarness {
c.tableOperations().offline(tableName);
- FunctionalTestUtils.assertNoDanglingFateLocks(getClientContext(), getCluster());
+ FunctionalTestUtils.assertNoDanglingFateLocks((ClientContext) c, getCluster());
}
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
index edc14d7..659e28e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
@@ -120,7 +120,10 @@ public class HalfDeadTServerIT extends ConfigurableMacBase {
if (!makeDiskFailureLibrary())
return null;
try (AccumuloClient c = createClient()) {
- assertEquals(1, c.instanceOperations().getTabletServers().size());
+ while (c.instanceOperations().getTabletServers().isEmpty()) {
+ // wait until a tserver is running
+ Thread.sleep(50);
+ }
// create our own tablet server with the special test library
String javaHome = System.getProperty("java.home");
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MasterAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MasterAssignmentIT.java
index a5a1c34..d8f1165 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MasterAssignmentIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MasterAssignmentIT.java
@@ -20,8 +20,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import java.io.FileNotFoundException;
-
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -34,7 +32,6 @@ import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.commons.configuration.ConfigurationException;
import org.junit.Test;
public class MasterAssignmentIT extends AccumuloClusterHarness {
@@ -89,10 +86,8 @@ public class MasterAssignmentIT extends AccumuloClusterHarness {
}
}
- private TabletLocationState getTabletLocationState(AccumuloClient c, String tableId)
- throws FileNotFoundException, ConfigurationException {
- ClientContext context = getClientContext();
- try (MetaDataTableScanner s = new MetaDataTableScanner(context,
+ private TabletLocationState getTabletLocationState(AccumuloClient c, String tableId) {
+ try (MetaDataTableScanner s = new MetaDataTableScanner((ClientContext) c,
new Range(TabletsSection.getRow(Table.ID.of(tableId), null)))) {
return s.next();
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
index 2eec405..b3fb553 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
@@ -86,10 +86,10 @@ public class MetadataMaxFilesIT extends ConfigurableMacBase {
cluster.start();
while (true) {
- MasterMonitorInfo stats = null;
+ MasterMonitorInfo stats;
Client client = null;
try {
- ClientContext context = getClientContext();
+ ClientContext context = (ClientContext) c;
client = MasterClient.getConnectionWithRetry(context);
log.info("Fetching stats");
stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MonitorSslIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MonitorSslIT.java
index b6e97af..ce2b752 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MonitorSslIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MonitorSslIT.java
@@ -35,6 +35,8 @@ import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.MonitorUtil;
import org.apache.accumulo.minicluster.ServerType;
@@ -126,15 +128,17 @@ public class MonitorSslIT extends ConfigurableMacBase {
log.debug("Starting Monitor");
cluster.getClusterControl().startAllServers(ServerType.MONITOR);
String monitorLocation = null;
- while (null == monitorLocation) {
- try {
- monitorLocation = MonitorUtil.getLocation(getClientContext());
- } catch (Exception e) {
- // ignored
- }
- if (null == monitorLocation) {
- log.debug("Could not fetch monitor HTTP address from zookeeper");
- Thread.sleep(2000);
+ try (AccumuloClient client = createClient()) {
+ while (null == monitorLocation) {
+ try {
+ monitorLocation = MonitorUtil.getLocation((ClientContext) client);
+ } catch (Exception e) {
+ // ignored
+ }
+ if (null == monitorLocation) {
+ log.debug("Could not fetch monitor HTTP address from zookeeper");
+ Thread.sleep(2000);
+ }
}
}
URL url = new URL("https://" + monitorLocation);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index 3976e3c..855b174 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -72,6 +72,7 @@ import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
@@ -127,8 +128,10 @@ public class ReadWriteIT extends AccumuloClusterHarness {
@Test(expected = RuntimeException.class)
public void invalidInstanceName() throws Exception {
- Accumulo.newClient().to("fake_instance_name", cluster.getZooKeepers())
- .as(getAdminPrincipal(), getAdminToken()).build();
+ try (AccumuloClient client = Accumulo.newClient().to("fake_instance_name", cluster.getZooKeepers())
+ .as(getAdminPrincipal(), getAdminToken()).build()) {
+ client.instanceOperations().getTabletServers();
+ }
}
@SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", "URLCONNECTION_SSRF_FD"},
@@ -145,7 +148,7 @@ public class ReadWriteIT extends AccumuloClusterHarness {
verify(accumuloClient, getClientInfo(), ROWS, COLS, 50, 0, tableName);
String monitorLocation = null;
while (null == monitorLocation) {
- monitorLocation = MonitorUtil.getLocation(getClientContext());
+ monitorLocation = MonitorUtil.getLocation((ClientContext) accumuloClient);
if (null == monitorLocation) {
log.debug("Could not fetch monitor HTTP address from zookeeper");
Thread.sleep(2000);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
index d43a12e..2d02c77 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.test.functional;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
@@ -56,7 +57,7 @@ public class RenameIT extends AccumuloClusterHarness {
vopts.setTableName(name1);
VerifyIngest.verifyIngest(c, vopts, scanOpts);
- FunctionalTestUtils.assertNoDanglingFateLocks(getClientContext(), getCluster());
+ FunctionalTestUtils.assertNoDanglingFateLocks((ClientContext) c, getCluster());
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
index 139dd7c..10035d0 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
@@ -79,7 +79,6 @@ public class SimpleBalancerFairnessIT extends ConfigurableMacBase {
c.tableOperations().flush("test_ingest", null, null, false);
sleepUninterruptibly(45, TimeUnit.SECONDS);
Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
- ClientContext context = getClientContext();
MasterMonitorInfo stats = null;
int unassignedTablets = 1;
@@ -87,7 +86,7 @@ public class SimpleBalancerFairnessIT extends ConfigurableMacBase {
MasterClientService.Iface client = null;
while (true) {
try {
- client = MasterClient.getConnectionWithRetry(context);
+ client = MasterClient.getConnectionWithRetry((ClientContext) c);
stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstanceID()));
break;
} catch (ThriftNotActiveServiceException e) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
index a9de9f4..900445b 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
@@ -81,7 +81,7 @@ public class TableChangeStateIT extends AccumuloClusterHarness {
@Before
public void setup() {
accumuloClient = createAccumuloClient();
- context = new ClientContext(accumuloClient);
+ context = (ClientContext) accumuloClient;
}
@After
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
index e2bbea6..61a4b6c 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
@@ -256,7 +256,7 @@ public class TabletStateChangeIteratorIT extends AccumuloClusterHarness {
@Override
public Set<Table.ID> onlineTables() {
- ClientContext context = getClientContext();
+ ClientContext context = (ClientContext) client;
Set<Table.ID> onlineTables = Tables.getIdToNameMap(context).keySet();
return Sets.filter(onlineTables,
tableId -> Tables.getTableState(context, tableId) == TableState.ONLINE);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index 83ab22e..d882abb 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -96,7 +96,7 @@ public class WALSunnyDayIT extends ConfigurableMacBase {
MiniAccumuloClusterControl control = mac.getClusterControl();
control.stop(GARBAGE_COLLECTOR);
ServerContext context = getServerContext();
- try (AccumuloClient c = context.getClient()) {
+ try (AccumuloClient c = createClient()) {
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
writeSomeData(c, tableName, 1, 1);
diff --git a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
index eb601e6..931eb57 100644
--- a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
@@ -51,7 +51,6 @@ import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
import org.junit.Test;
public class MergeStateIT extends ConfigurableMacBase {
@@ -106,10 +105,8 @@ public class MergeStateIT extends ConfigurableMacBase {
@Test
public void test() throws Exception {
- ServerContext context = EasyMock.createMock(ServerContext.class);
+ ServerContext context = getServerContext();
try (AccumuloClient accumuloClient = createClient()) {
- EasyMock.expect(context.getClient()).andReturn(accumuloClient).anyTimes();
- EasyMock.replay(context);
accumuloClient.securityOperations().grantTablePermission(accumuloClient.whoami(),
MetadataTable.NAME, TablePermission.WRITE);
BatchWriter bw = accumuloClient.createBatchWriter(MetadataTable.NAME,
diff --git a/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java
index b9af4ae..ed306f5 100644
--- a/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java
@@ -164,94 +164,95 @@ public class SuspendedTabletsIT extends ConfigurableMacBase {
* callback which shuts down some tablet servers.
*/
private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
- ClientContext ctx = getClientContext();
+ try (AccumuloClient client = createClient()) {
+ ClientContext ctx = (ClientContext) client;
- String tableName = getUniqueNames(1)[0];
+ String tableName = getUniqueNames(1)[0];
- AccumuloClient client = ctx.getClient();
+ // Create a table with a bunch of splits
+ log.info("Creating table " + tableName);
+ ctx.tableOperations().create(tableName);
+ SortedSet<Text> splitPoints = new TreeSet<>();
+ for (int i = 1; i < TABLETS; ++i) {
+ splitPoints.add(new Text("" + i));
+ }
+ ctx.tableOperations().addSplits(tableName, splitPoints);
+
+ // Wait for all of the tablets to hosted ...
+ log.info("Waiting on hosting and balance");
+ TabletLocations ds;
+ for (ds = TabletLocations.retrieve(ctx,
+ tableName); ds.hostedCount != TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) {
+ Thread.sleep(1000);
+ }
- // Create a table with a bunch of splits
- log.info("Creating table " + tableName);
- client.tableOperations().create(tableName);
- SortedSet<Text> splitPoints = new TreeSet<>();
- for (int i = 1; i < TABLETS; ++i) {
- splitPoints.add(new Text("" + i));
- }
- client.tableOperations().addSplits(tableName, splitPoints);
-
- // Wait for all of the tablets to hosted ...
- log.info("Waiting on hosting and balance");
- TabletLocations ds;
- for (ds = TabletLocations.retrieve(ctx,
- tableName); ds.hostedCount != TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) {
- Thread.sleep(1000);
- }
+ // ... and balanced.
+ ctx.instanceOperations().waitForBalance();
+ do {
+ // Give at least another 5 seconds for migrations to finish up
+ Thread.sleep(5000);
+ ds = TabletLocations.retrieve(ctx, tableName);
+ } while (ds.hostedCount != TABLETS);
- // ... and balanced.
- client.instanceOperations().waitForBalance();
- do {
- // Give at least another 5 seconds for migrations to finish up
- Thread.sleep(5000);
- ds = TabletLocations.retrieve(ctx, tableName);
- } while (ds.hostedCount != TABLETS);
-
- // Pray all of our tservers have at least 1 tablet.
- assertEquals(TSERVERS, ds.hosted.keySet().size());
-
- // Kill two tablet servers hosting our tablets. This should put tablets into suspended state,
- // and thus halt balancing.
-
- TabletLocations beforeDeathState = ds;
- log.info("Eliminating tablet servers");
- serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2);
-
- // Eventually some tablets will be suspended.
- log.info("Waiting on suspended tablets");
- ds = TabletLocations.retrieve(ctx, tableName);
- // Until we can scan the metadata table, the master probably can't either, so won't have been
- // able to suspend the tablets.
- // So we note the time that we were first able to successfully scan the metadata table.
- long killTime = System.nanoTime();
- while (ds.suspended.keySet().size() != 2) {
- Thread.sleep(1000);
+ // Pray all of our tservers have at least 1 tablet.
+ assertEquals(TSERVERS, ds.hosted.keySet().size());
+
+ // Kill two tablet servers hosting our tablets. This should put tablets into suspended state,
+ // and thus halt balancing.
+
+ TabletLocations beforeDeathState = ds;
+ log.info("Eliminating tablet servers");
+ serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2);
+
+ // Eventually some tablets will be suspended.
+ log.info("Waiting on suspended tablets");
ds = TabletLocations.retrieve(ctx, tableName);
- }
+ // Until we can scan the metadata table, the master probably can't either, so won't have been
+ // able to suspend the tablets.
+ // So we note the time that we were first able to successfully scan the metadata table.
+ long killTime = System.nanoTime();
+ while (ds.suspended.keySet().size() != 2) {
+ Thread.sleep(1000);
+ ds = TabletLocations.retrieve(ctx, tableName);
+ }
- SetMultimap<HostAndPort,KeyExtent> deadTabletsByServer = ds.suspended;
+ SetMultimap<HostAndPort,KeyExtent> deadTabletsByServer = ds.suspended;
- // By this point, all tablets should be either hosted or suspended. All suspended tablets should
- // "belong" to the dead tablet servers, and should be in exactly the same place as before any
- // tserver death.
- for (HostAndPort server : deadTabletsByServer.keySet()) {
- assertEquals(deadTabletsByServer.get(server), beforeDeathState.hosted.get(server));
- }
- assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount);
-
- // Restart the first tablet server, making sure it ends up on the same port
- HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next();
- log.info("Restarting " + restartedServer);
- getCluster().getClusterControl().start(ServerType.TABLET_SERVER, null,
- ImmutableMap.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(),
- Property.TSERV_PORTSEARCH.getKey(), "false"),
- 1);
-
- // Eventually, the suspended tablets should be reassigned to the newly alive tserver.
- log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer);
- for (ds = TabletLocations.retrieve(ctx, tableName); ds.suspended.containsKey(restartedServer)
- || ds.assignedCount != 0; ds = TabletLocations.retrieve(ctx, tableName)) {
- Thread.sleep(1000);
- }
- assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer));
+ // By this point, all tablets should be either hosted or suspended. All suspended tablets
+ // should
+ // "belong" to the dead tablet servers, and should be in exactly the same place as before any
+ // tserver death.
+ for (HostAndPort server : deadTabletsByServer.keySet()) {
+ assertEquals(deadTabletsByServer.get(server), beforeDeathState.hosted.get(server));
+ }
+ assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount);
+
+ // Restart the first tablet server, making sure it ends up on the same port
+ HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next();
+ log.info("Restarting " + restartedServer);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER, null,
+ ImmutableMap.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(),
+ Property.TSERV_PORTSEARCH.getKey(), "false"),
+ 1);
+
+ // Eventually, the suspended tablets should be reassigned to the newly alive tserver.
+ log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer);
+ for (ds = TabletLocations.retrieve(ctx, tableName); ds.suspended.containsKey(restartedServer)
+ || ds.assignedCount != 0; ds = TabletLocations.retrieve(ctx, tableName)) {
+ Thread.sleep(1000);
+ }
+ assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer));
- // Finally, after much longer, remaining suspended tablets should be reassigned.
- log.info("Awaiting tablet reassignment for remaining tablets");
- for (ds = TabletLocations.retrieve(ctx,
- tableName); ds.hostedCount != TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) {
- Thread.sleep(1000);
- }
+ // Finally, after much longer, remaining suspended tablets should be reassigned.
+ log.info("Awaiting tablet reassignment for remaining tablets");
+ for (ds = TabletLocations.retrieve(ctx,
+ tableName); ds.hostedCount != TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) {
+ Thread.sleep(1000);
+ }
- long recoverTime = System.nanoTime();
- assertTrue(recoverTime - killTime >= NANOSECONDS.convert(SUSPEND_DURATION, MILLISECONDS));
+ long recoverTime = System.nanoTime();
+ assertTrue(recoverTime - killTime >= NANOSECONDS.convert(SUSPEND_DURATION, MILLISECONDS));
+ }
}
private interface TServerKiller {
@@ -313,7 +314,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase {
}
private void scan(ClientContext ctx, String tableName) throws Exception {
- Map<String,String> idMap = ctx.getClient().tableOperations().tableIdMap();
+ Map<String,String> idMap = ctx.tableOperations().tableIdMap();
String tableId = Objects.requireNonNull(idMap.get(tableName));
try (MetaDataTableScanner scanner = new MetaDataTableScanner(ctx, new Range())) {
while (scanner.hasNext()) {
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 3aa09bc..27038c7 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -106,8 +106,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacB
*/
private Set<String> getWalsForTable(String tableName) throws Exception {
final ServerContext context = getServerContext();
- final AccumuloClient client = context.getClient();
- final String tableId = client.tableOperations().tableIdMap().get(tableName);
+ final String tableId = context.tableOperations().tableIdMap().get(tableName);
assertNotNull("Could not determine table ID for " + tableName, tableId);
@@ -382,7 +381,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacB
client.tableOperations().flush(otherTable, null, null, true);
// Get the tservers which the master deems as active
- final ClientContext context = getClientContext();
+ final ClientContext context = (ClientContext) client;
List<String> tservers = MasterClient.execute(context,
cli -> cli.getActiveTservers(Tracer.traceInfo(), context.rpcCreds()));
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java b/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
index fce0887..9a2e6be 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
@@ -54,7 +54,7 @@ public class MultiTserverReplicationIT extends ConfigurableMacBase {
public void tserverReplicationServicePortsAreAdvertised() throws Exception {
// Wait for the cluster to be up
AccumuloClient client = createClient();
- ClientContext context = getClientContext();
+ ClientContext context = (ClientContext) client;
// Wait for a tserver to come up to fulfill this request
client.tableOperations().create("foo");
@@ -91,7 +91,7 @@ public class MultiTserverReplicationIT extends ConfigurableMacBase {
public void masterReplicationServicePortsAreAdvertised() throws Exception {
// Wait for the cluster to be up
AccumuloClient client = createClient();
- ClientContext context = getClientContext();
+ ClientContext context = (ClientContext) client;
// Wait for a tserver to come up to fulfill this request
client.tableOperations().create("foo");
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
index d4e8421..176511d 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -176,8 +176,7 @@ public class ReplicationIT extends ConfigurableMacBase {
private Multimap<String,Table.ID> getAllLogs(AccumuloClient client, ServerContext context)
throws Exception {
Multimap<String,Table.ID> logs = getLogs(client, context);
- try (Scanner scanner = context.getClient().createScanner(ReplicationTable.NAME,
- Authorizations.EMPTY)) {
+ try (Scanner scanner = context.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
StatusSection.limit(scanner);
Text buff = new Text();
for (Entry<Key,Value> entry : scanner) {
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
index cddf30d..59b431e 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
@@ -85,7 +85,7 @@ public class ReplicationOperationsImplIT extends ConfigurableMacBase {
private ReplicationOperationsImpl getReplicationOperations() throws Exception {
Master master = EasyMock.createMock(Master.class);
ServerContext serverContext = EasyMock.createMock(ServerContext.class);
- EasyMock.expect(master.getClient()).andReturn(client).anyTimes();
+ EasyMock.expect((AccumuloClient) master.getContext()).andReturn(client).anyTimes();
EasyMock.expect(master.getContext()).andReturn(serverContext).anyTimes();
EasyMock.replay(master);
@@ -101,7 +101,7 @@ public class ReplicationOperationsImplIT extends ConfigurableMacBase {
}
};
- ClientContext context = getClientContext();
+ ClientContext context = (ClientContext) client;
return new ReplicationOperationsImpl(context) {
@Override
protected boolean getMasterDrain(final TInfo tinfo, final TCredentials rpcCreds,
diff --git a/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java b/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java
index a9e533b..7d634b6 100644
--- a/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import java.util.Map.Entry;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -43,7 +44,7 @@ public class SystemCredentialsIT extends ConfigurableMacBase {
@Override
protected int defaultTimeoutSeconds() {
- return 1 * 60;
+ return 60;
}
@Test
@@ -56,39 +57,40 @@ public class SystemCredentialsIT extends ConfigurableMacBase {
exec(SystemCredentialsIT.class, "bad_password", getCluster().getZooKeepers()).waitFor());
}
- public static void main(final String[] args)
- throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+ public static void main(final String[] args) throws AccumuloException, TableNotFoundException {
SiteConfiguration siteConfig = new SiteConfiguration();
ServerContext context = new ServerContext(siteConfig);
- Credentials creds = null;
+ Credentials creds;
String badInstanceID = SystemCredentials.class.getName();
if (args.length < 2)
throw new RuntimeException("Incorrect usage; expected to be run by test only");
- if (args[0].equals("bad")) {
- creds = SystemCredentials.get(badInstanceID, siteConfig);
- } else if (args[0].equals("good")) {
- creds = SystemCredentials.get(context.getInstanceID(), siteConfig);
- } else if (args[0].equals("bad_password")) {
- creds = new SystemCredentials(badInstanceID, "!SYSTEM", new PasswordToken("fake"));
- } else {
- throw new RuntimeException("Incorrect usage; expected to be run by test only");
+ switch (args[0]) {
+ case "bad":
+ creds = SystemCredentials.get(badInstanceID, siteConfig);
+ break;
+ case "good":
+ creds = SystemCredentials.get(context.getInstanceID(), siteConfig);
+ break;
+ case "bad_password":
+ creds = new SystemCredentials(badInstanceID, "!SYSTEM", new PasswordToken("fake"));
+ break;
+ default:
+ throw new RuntimeException("Incorrect usage; expected to be run by test only");
}
- AccumuloClient client;
- try {
- client = context.getClient(creds.getPrincipal(), creds.getToken());
+ try (AccumuloClient client = Accumulo.newClient().from(context.getProperties())
+ .as(creds.getPrincipal(), creds.getToken()).build()) {
client.securityOperations().authenticateUser(creds.getPrincipal(), creds.getToken());
+ try (Scanner scan = client.createScanner(RootTable.NAME, Authorizations.EMPTY)) {
+ for (Entry<Key,Value> e : scan) {
+ e.hashCode();
+ }
+ } catch (RuntimeException e) {
+ e.printStackTrace(System.err);
+ System.exit(SCAN_FAILED);
+ }
} catch (AccumuloSecurityException e) {
e.printStackTrace(System.err);
System.exit(AUTHENICATION_FAILED);
- return;
- }
- try (Scanner scan = client.createScanner(RootTable.NAME, Authorizations.EMPTY)) {
- for (Entry<Key,Value> e : scan) {
- e.hashCode();
- }
- } catch (RuntimeException e) {
- e.printStackTrace(System.err);
- System.exit(SCAN_FAILED);
}
}
}