You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2019/03/05 18:17:30 UTC
[accumulo] branch master updated: Simplified option parsing (#1010)
This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new c482550 Simplified option parsing (#1010)
c482550 is described below
commit c482550082a4b2386e4270ec4b78a3ee1c402b76
Author: Mike Walch <mw...@apache.org>
AuthorDate: Tue Mar 5 13:17:26 2019 -0500
Simplified option parsing (#1010)
* Removed setters in ClientOpts
* Simplify RowHash options
---
.../org/apache/accumulo/core/cli/ClientOpts.java | 36 +----
.../java/org/apache/accumulo/core/util/Merge.java | 11 +-
.../apache/accumulo/core/cli/TestClientOpts.java | 13 +-
.../lib/MapReduceClientOnDefaultTable.java | 51 ------
.../lib/MapReduceClientOnRequiredTable.java | 47 ------
.../mapreduce/lib/MapReduceClientOpts.java | 31 ++--
.../accumulo/hadoop/its/mapreduce/RowHashIT.java | 13 +-
.../apache/accumulo/server/cli/ServerUtilOpts.java | 5 +-
.../server/util/CheckForMetadataProblems.java | 3 +-
.../apache/accumulo/server/util/LocalityCheck.java | 4 +-
.../apache/accumulo/server/util/RandomWriter.java | 11 +-
.../accumulo/server/util/TableDiskUsage.java | 3 +-
.../server/util/VerifyTabletAssignments.java | 3 +-
.../apache/accumulo/master/state/MergeStats.java | 3 +-
.../java/org/apache/accumulo/tracer/TraceDump.java | 5 +-
.../apache/accumulo/tracer/TraceTableStats.java | 3 +-
.../org/apache/accumulo/test/TestBinaryRows.java | 3 +-
.../java/org/apache/accumulo/test/TestIngest.java | 174 +++++++++++++--------
.../apache/accumulo/test/TestMultiTableIngest.java | 9 +-
.../apache/accumulo/test/TestRandomDeletes.java | 7 +-
.../org/apache/accumulo/test/VerifyIngest.java | 77 ++++++---
.../BalanceInPresenceOfOfflineTableIT.java | 13 +-
.../apache/accumulo/test/functional/BulkIT.java | 48 +++---
.../test/functional/BulkSplitOptimizationIT.java | 24 ++-
.../test/functional/ChaoticBalancerIT.java | 15 +-
.../accumulo/test/functional/CompactionIT.java | 29 ++--
.../apache/accumulo/test/functional/DeleteIT.java | 22 +--
.../test/functional/DynamicThreadPoolsIT.java | 10 +-
.../accumulo/test/functional/FateStarvationIT.java | 16 +-
.../test/functional/FunctionalTestUtils.java | 18 +--
.../test/functional/GarbageCollectorIT.java | 20 ++-
.../test/functional/HalfDeadTServerIT.java | 7 +-
.../accumulo/test/functional/MasterFailoverIT.java | 13 +-
.../apache/accumulo/test/functional/MaxOpenIT.java | 17 +-
.../accumulo/test/functional/ReadWriteIT.java | 37 ++---
.../apache/accumulo/test/functional/RenameIT.java | 23 ++-
.../apache/accumulo/test/functional/RestartIT.java | 59 +++----
.../accumulo/test/functional/RestartStressIT.java | 15 +-
.../test/functional/SimpleBalancerFairnessIT.java | 7 +-
.../apache/accumulo/test/functional/SplitIT.java | 15 +-
.../apache/accumulo/test/functional/TableIT.java | 16 +-
.../accumulo/test/functional/WriteAheadLogIT.java | 16 +-
.../accumulo/test/functional/WriteLotsIT.java | 19 +--
.../apache/accumulo/test/mapreduce/RowHash.java | 105 +++----------
.../test/performance/ContinuousIngest.java | 3 +-
.../test/performance/scan/CollectTabletStats.java | 3 +-
46 files changed, 448 insertions(+), 634 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
index 1e66d90..e821283 100644
--- a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
+++ b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
@@ -26,12 +26,9 @@ import java.util.Map;
import java.util.Properties;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.clientImpl.ClientInfoImpl;
import org.apache.accumulo.core.conf.ClientProperty;
-import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.htrace.NullScope;
@@ -47,13 +44,6 @@ import com.beust.jcommander.converters.IParameterSplitter;
public class ClientOpts extends Help {
- public static class MemoryConverter implements IStringConverter<Long> {
- @Override
- public Long convert(String value) {
- return ConfigurationTypeHelper.getFixedMemoryAsBytes(value);
- }
- }
-
public static class AuthConverter implements IStringConverter<Authorizations> {
@Override
public Authorizations convert(String value) {
@@ -96,14 +86,14 @@ public class ClientOpts extends Help {
}
@Parameter(names = {"-u", "--user"}, description = "Connection user")
- private String principal = null;
+ public String principal = null;
@Parameter(names = "--password", converter = PasswordConverter.class,
description = "Enter the connection password", password = true)
private Password securePassword = null;
public AuthenticationToken getToken() {
- return ClientProperty.getAuthenticationToken(getClientProperties());
+ return ClientProperty.getAuthenticationToken(getClientProps());
}
@Parameter(names = {"-auths", "--auths"}, converter = AuthConverter.class,
@@ -146,26 +136,6 @@ public class ClientOpts extends Help {
private Properties cachedProps = null;
- public String getPrincipal() {
- return ClientProperty.AUTH_PRINCIPAL.getValue(getClientProperties());
- }
-
- public void setPrincipal(String principal) {
- this.principal = principal;
- }
-
- public void setClientProperties(Properties clientProps) {
- ClientProperty.validate(clientProps);
- this.cachedProps = clientProps;
- }
-
- /**
- * @return {@link AccumuloClient} that must be closed by user
- */
- public AccumuloClient createClient() {
- return Accumulo.newClient().from(getClientProperties()).build();
- }
-
public String getClientConfigFile() {
if (clientConfigFile == null) {
URL clientPropsUrl = ClientOpts.class.getClassLoader()
@@ -177,7 +147,7 @@ public class ClientOpts extends Help {
return clientConfigFile;
}
- public Properties getClientProperties() {
+ public Properties getClientProps() {
if (cachedProps == null) {
cachedProps = new Properties();
if (getClientConfigFile() != null) {
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Merge.java b/core/src/main/java/org/apache/accumulo/core/util/Merge.java
index 1a1c402..d3ff399 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Merge.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Merge.java
@@ -21,11 +21,13 @@ import java.util.Iterator;
import java.util.List;
import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -56,6 +58,13 @@ public class Merge {
log.info(String.format(format, args));
}
+ public static class MemoryConverter implements IStringConverter<Long> {
+ @Override
+ public Long convert(String value) {
+ return ConfigurationTypeHelper.getFixedMemoryAsBytes(value);
+ }
+ }
+
static class TextConverter implements IStringConverter<Text> {
@Override
public Text convert(String value) {
@@ -84,7 +93,7 @@ public class Merge {
Opts opts = new Opts();
try (TraceScope clientTrace = opts.parseArgsAndTrace(Merge.class.getName(), args)) {
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
if (!client.tableOperations().exists(opts.tableName)) {
System.err.println("table " + opts.tableName + " does not exist");
diff --git a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
index f1e55ba..90e4ff5 100644
--- a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
+++ b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
@@ -19,7 +19,10 @@ package org.apache.accumulo.core.cli;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.Properties;
+
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.junit.Test;
public class TestClientOpts {
@@ -31,16 +34,18 @@ public class TestClientOpts {
"instance.zookeepers=zoo1,zoo2", "-o", "auth.type=password", "-o", "auth.principal=user123",
"-o", "auth.token=mypass"};
opts.parseArgs("test", args);
- assertEquals("user123", opts.getPrincipal());
+ Properties props = opts.getClientProps();
+ assertEquals("user123", ClientProperty.AUTH_PRINCIPAL.getValue(props));
assertTrue(opts.getToken() instanceof PasswordToken);
- assertEquals("myinst", opts.getClientProperties().getProperty("instance.name"));
+ assertEquals("myinst", props.getProperty("instance.name"));
opts = new ClientOpts();
args = new String[] {"-o", "instance.name=myinst", "-o", "instance.zookeepers=zoo1,zoo2", "-o",
"auth.type=password", "-o", "auth.token=mypass", "-u", "userabc"};
opts.parseArgs("test", args);
- assertEquals("userabc", opts.getPrincipal());
+ props = opts.getClientProps();
+ assertEquals("userabc", ClientProperty.AUTH_PRINCIPAL.getValue(props));
assertTrue(opts.getToken() instanceof PasswordToken);
- assertEquals("myinst", opts.getClientProperties().getProperty("instance.name"));
+ assertEquals("myinst", props.getProperty("instance.name"));
}
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
deleted file mode 100644
index 92d5fb7..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce.lib;
-
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.beust.jcommander.Parameter;
-
-public class MapReduceClientOnDefaultTable extends MapReduceClientOpts {
- @Parameter(names = "--table", description = "table to use")
- public String tableName;
-
- public MapReduceClientOnDefaultTable(String table) {
- this.tableName = table;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- @Override
- public void setAccumuloConfigs(Job job) throws AccumuloException, AccumuloSecurityException {
- final String tableName = getTableName();
- final Properties clientProps = getClientProperties();
- AccumuloInputFormat.configure().clientProperties(clientProps).table(tableName).auths(auths)
- .store(job);
- AccumuloOutputFormat.configure().clientProperties(clientProps).defaultTable(tableName)
- .createTables(true).store(job);
- }
-
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
deleted file mode 100644
index 18ff939..0000000
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.hadoopImpl.mapreduce.lib;
-
-import java.util.Properties;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.beust.jcommander.Parameter;
-
-public class MapReduceClientOnRequiredTable extends MapReduceClientOpts {
-
- @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
- private String tableName;
-
- @Override
- public void setAccumuloConfigs(Job job) throws AccumuloException, AccumuloSecurityException {
- final String tableName = getTableName();
- final Properties clientProps = getClientProperties();
- AccumuloInputFormat.configure().clientProperties(clientProps).table(tableName).auths(auths)
- .store(job);
- AccumuloOutputFormat.configure().clientProperties(clientProps).defaultTable(tableName)
- .createTables(true).store(job);
- }
-
- public String getTableName() {
- return tableName;
- }
-}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java
index e376697..b1cfdf8 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java
@@ -16,16 +16,16 @@
*/
package org.apache.accumulo.hadoopImpl.mapreduce.lib;
+import java.util.Properties;
+
import org.apache.accumulo.core.cli.ClientOpts;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.security.SystemPermission;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,17 +33,15 @@ import org.slf4j.LoggerFactory;
/**
* Adds some MR awareness to the ClientOpts
*/
-public abstract class MapReduceClientOpts extends ClientOpts {
- private static final Logger log = LoggerFactory.getLogger(MapReduceClientOpts.class);
+public class MapReduceClientOpts extends ClientOpts {
- public abstract void setAccumuloConfigs(Job job)
- throws AccumuloException, AccumuloSecurityException;
+ private static final Logger log = LoggerFactory.getLogger(MapReduceClientOpts.class);
- @Override
- public AuthenticationToken getToken() {
- AuthenticationToken authToken = super.getToken();
+ public Properties getClientProps() {
+ Properties props = super.getClientProps();
// For MapReduce, Kerberos credentials don't make it to the Mappers and Reducers,
// so we need to request a delegation token and use that instead.
+ AuthenticationToken authToken = ClientProperty.getAuthenticationToken(props);
if (authToken instanceof KerberosToken) {
log.info("Received KerberosToken, fetching DelegationToken for MapReduce");
final KerberosToken krbToken = (KerberosToken) authToken;
@@ -57,9 +55,8 @@ public abstract class MapReduceClientOpts extends ClientOpts {
String newPrincipal = user.getUserName();
log.info("Obtaining delegation token for {}", newPrincipal);
- setPrincipal(newPrincipal);
- try (AccumuloClient client = Accumulo.newClient().from(getClientProperties())
- .as(newPrincipal, krbToken).build()) {
+ try (AccumuloClient client = Accumulo.newClient().from(props).as(newPrincipal, krbToken)
+ .build()) {
// Do the explicit check to see if the user has the permission to get a delegation token
if (!client.securityOperations().hasSystemPermission(client.whoami(),
@@ -75,7 +72,11 @@ public abstract class MapReduceClientOpts extends ClientOpts {
}
// Get the delegation token from Accumulo
- return client.securityOperations().getDelegationToken(new DelegationTokenConfig());
+ AuthenticationToken token = client.securityOperations()
+ .getDelegationToken(new DelegationTokenConfig());
+
+ props.setProperty(ClientProperty.AUTH_PRINCIPAL.getKey(), newPrincipal);
+ ClientProperty.setAuthenticationToken(props, token);
}
} catch (Exception e) {
final String msg = "Failed to acquire DelegationToken for use with MapReduce";
@@ -83,6 +84,6 @@ public abstract class MapReduceClientOpts extends ClientOpts {
throw new RuntimeException(msg, e);
}
}
- return authToken;
+ return props;
}
}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
index a8823ba..06ba291 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java
@@ -41,7 +41,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.hadoopImpl.mapreduce.lib.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.hadoopImpl.mapreduce.lib.MapReduceClientOpts;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.conf.Configuration;
@@ -128,9 +128,11 @@ public class RowHashIT extends ConfigurableMacBase {
public void setup(Context job) {}
}
- public class Opts extends MapReduceClientOnRequiredTable {
+ public class Opts extends MapReduceClientOpts {
@Parameter(names = "--column", required = true)
String column;
+ @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
+ String tableName;
}
@Override
@@ -141,15 +143,14 @@ public class RowHashIT extends ConfigurableMacBase {
RowHash.Opts opts = new RowHash.Opts();
opts.parseArgs(RowHash.class.getName(), args);
job.setInputFormatClass(AccumuloInputFormat.class);
- opts.setAccumuloConfigs(job);
String col = opts.column;
int idx = col.indexOf(":");
Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
if (cf.getLength() > 0)
- AccumuloInputFormat.configure().clientProperties(opts.getClientProperties())
- .table(opts.getTableName()).auths(Authorizations.EMPTY)
+ AccumuloInputFormat.configure().clientProperties(opts.getClientProps())
+ .table(opts.tableName).auths(Authorizations.EMPTY)
.fetchColumns(Collections.singleton(new IteratorSetting.Column(cf, cq))).store(job);
job.setMapperClass(RowHash.HashDataMapper.class);
@@ -159,7 +160,7 @@ public class RowHashIT extends ConfigurableMacBase {
job.setNumReduceTasks(0);
job.setOutputFormatClass(AccumuloOutputFormat.class);
- AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties()).store(job);
+ AccumuloOutputFormat.configure().clientProperties(opts.getClientProps()).store(job);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java
index 0085093..1578b9a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java
@@ -21,9 +21,6 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.server.ServerContext;
public class ServerUtilOpts extends ClientOpts {
- {
- setPrincipal("root");
- }
private ServerContext context;
@@ -32,7 +29,7 @@ public class ServerUtilOpts extends ClientOpts {
if (getClientConfigFile() == null) {
context = new ServerContext(new SiteConfiguration());
} else {
- context = new ServerContext(new SiteConfiguration(), getClientProperties());
+ context = new ServerContext(new SiteConfiguration(), getClientProps());
}
}
return context;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java b/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
index 0fe3fdf..1762364 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
@@ -96,7 +97,7 @@ public class CheckForMetadataProblems {
System.out.println("Checking table: " + tableNameToCheck);
Map<String,TreeSet<KeyExtent>> tables = new HashMap<>();
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
Scanner scanner = client.createScanner(tableNameToCheck, Authorizations.EMPTY);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java b/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
index 8998323..087701e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
@@ -46,7 +47,8 @@ public class LocalityCheck {
try (TraceScope clientSpan = opts.parseArgsAndTrace(LocalityCheck.class.getName(), args)) {
VolumeManager fs = opts.getServerContext().getVolumeManager();
- try (AccumuloClient accumuloClient = opts.createClient()) {
+ try (AccumuloClient accumuloClient = Accumulo.newClient().from(opts.getClientProps())
+ .build()) {
Scanner scanner = accumuloClient.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java b/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java
index c0d3a84..1c1241b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java
@@ -18,11 +18,14 @@ package org.apache.accumulo.server.util;
import java.security.SecureRandom;
import java.util.Iterator;
+import java.util.Properties;
import java.util.Random;
import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;
@@ -91,11 +94,13 @@ public class RandomWriter {
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
- opts.setPrincipal("root");
+ opts.principal = "root";
try (TraceScope clientSpan = opts.parseArgsAndTrace(RandomWriter.class.getName(), args)) {
long start = System.currentTimeMillis();
- log.info("starting at {} for user {}", start, opts.getPrincipal());
- try (AccumuloClient accumuloClient = opts.createClient();
+ Properties clientProps = opts.getClientProps();
+ String principal = ClientProperty.AUTH_PRINCIPAL.getValue(clientProps);
+ log.info("starting at {} for user {}", start, principal);
+ try (AccumuloClient accumuloClient = Accumulo.newClient().from(clientProps).build();
BatchWriter bw = accumuloClient.createBatchWriter(opts.tableName)) {
log.info("Writing {} mutations...", opts.count);
bw.addMutations(new RandomMutationGenerator(opts.count));
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java b/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
index 789e69c..c4085f7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -298,7 +299,7 @@ public class TableDiskUsage {
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
try (TraceScope clientSpan = opts.parseArgsAndTrace(TableDiskUsage.class.getName(), args)) {
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
VolumeManager fs = opts.getServerContext().getVolumeManager();
org.apache.accumulo.server.util.TableDiskUsage.printDiskUsage(opts.tables, fs, client,
false);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
index 4346eba..dda58f4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -74,7 +75,7 @@ public class VerifyTabletAssignments {
Opts opts = new Opts();
try (TraceScope clientSpan = opts.parseArgsAndTrace(VerifyTabletAssignments.class.getName(),
args)) {
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
for (String table : client.tableOperations().list())
checkTable((ClientContext) client, opts, table, null);
}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
index efd9422..ccbe13a 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -263,7 +264,7 @@ public class MergeStats {
ServerUtilOpts opts = new ServerUtilOpts();
try (TraceScope clientSpan = opts.parseArgsAndTrace(MergeStats.class.getName(), args)) {
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
Map<String,String> tableIdMap = client.tableOperations().tableIdMap();
ZooReaderWriter zooReaderWriter = opts.getServerContext().getZooReaderWriter();
for (Entry<String,String> entry : tableIdMap.entrySet()) {
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceDump.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceDump.java
index 8658eb5..4ebbf60 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceDump.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceDump.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
@@ -77,7 +78,7 @@ public class TraceDump {
PrintStream out = System.out;
long endTime = System.currentTimeMillis();
long startTime = endTime - opts.length;
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
Scanner scanner = client.createScanner(opts.tableName, opts.auths);
Range range = new Range(new Text("start:" + Long.toHexString(startTime)),
new Text("start:" + Long.toHexString(endTime)));
@@ -100,7 +101,7 @@ public class TraceDump {
private static int dumpTrace(Opts opts) throws Exception {
final PrintStream out = System.out;
int count = 0;
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
for (String traceId : opts.traceIds) {
Scanner scanner = client.createScanner(opts.tableName, opts.auths);
Range range = new Range(new Text(traceId));
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceTableStats.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceTableStats.java
index 5a7e56d..85de1b6 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceTableStats.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceTableStats.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -82,7 +83,7 @@ public class TraceTableStats {
double maxSpanLength = 0;
double maxSpanLengthMS = 0;
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
Scanner scanner = client.createScanner(opts.tableName, Authorizations.EMPTY);
scanner.setRange(new Range(null, true, "idx:", false));
for (Entry<Key,Value> entry : scanner) {
diff --git a/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java b/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
index fe214b1..b3c3fc3 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.TreeSet;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
@@ -228,7 +229,7 @@ public class TestBinaryRows {
Opts opts = new Opts();
opts.parseArgs(TestBinaryRows.class.getName(), args);
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
runTest(client, opts);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index b75889a..e3a5823 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -20,11 +20,13 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -35,6 +37,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.TabletServerBatchWriter;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
@@ -48,7 +51,6 @@ import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.FastFormat;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
@@ -61,71 +63,114 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class TestIngest {
public static final Authorizations AUTHS = new Authorizations("L1", "L2", "G1", "GROUP2");
+ public static class IngestParams {
+ public Properties clientProps = new Properties();
+ public String tableName = "test_ingest";
+ public boolean createTable = false;
+ public int numsplits = 1;
+ public int startRow = 0;
+ public int rows = 100000;
+ public int cols = 1;
+ public Integer random = null;
+ public int dataSize = 1000;
+ public boolean delete = false;
+ public long timestamp = -1;
+ public String outputFile = null;
+ public int stride;
+ public String columnFamily = "colf";
+ public ColumnVisibility columnVisibility = new ColumnVisibility();
+
+ public IngestParams(Properties props) {
+ clientProps = props;
+ }
+
+ public IngestParams(Properties props, String table) {
+ this(props);
+ tableName = table;
+ }
+
+ public IngestParams(Properties props, String table, int rows) {
+ this(props, table);
+ this.rows = rows;
+ }
+ }
+
public static class Opts extends ClientOpts {
@Parameter(names = "--table", description = "table to use")
String tableName = "test_ingest";
@Parameter(names = "--createTable")
- public boolean createTable = false;
+ boolean createTable = false;
@Parameter(names = "--splits",
description = "the number of splits to use when creating the table")
- public int numsplits = 1;
+ int numsplits = 1;
@Parameter(names = "--start", description = "the starting row number")
- public int startRow = 0;
+ int startRow = 0;
@Parameter(names = "--rows", description = "the number of rows to ingest")
- public int rows = 100000;
+ int rows = 100000;
@Parameter(names = "--cols", description = "the number of columns to ingest per row")
- public int cols = 1;
+ int cols = 1;
@Parameter(names = "--random", description = "insert random rows and use"
+ " the given number to seed the psuedo-random number generator")
- public Integer random = null;
+ Integer random = null;
@Parameter(names = "--size", description = "the size of the value to ingest")
- public int dataSize = 1000;
+ int dataSize = 1000;
@Parameter(names = "--delete", description = "delete values instead of inserting them")
- public boolean delete = false;
+ boolean delete = false;
@Parameter(names = {"-ts", "--timestamp"}, description = "timestamp to use for all values")
- public long timestamp = -1;
+ long timestamp = -1;
@Parameter(names = "--rfile", description = "generate data into a file that can be imported")
- public String outputFile = null;
+ String outputFile = null;
@Parameter(names = "--stride", description = "the difference between successive row ids")
- public int stride;
+ int stride;
@Parameter(names = {"-cf", "--columnFamily"},
description = "place columns in this column family")
- public String columnFamily = "colf";
+ String columnFamily = "colf";
@Parameter(names = {"-cv", "--columnVisibility"},
description = "place columns in this column family", converter = VisibilityConverter.class)
- public ColumnVisibility columnVisibility = new ColumnVisibility();
-
- public Configuration conf = null;
- public FileSystem fs = null;
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
+ ColumnVisibility columnVisibility = new ColumnVisibility();
+
+ public IngestParams getIngestPrams() {
+ IngestParams params = new IngestParams(getClientProps(), tableName);
+ params.createTable = createTable;
+ params.numsplits = numsplits;
+ params.startRow = startRow;
+ params.rows = rows;
+ params.cols = cols;
+ params.random = random;
+ params.dataSize = dataSize;
+ params.delete = delete;
+ params.timestamp = timestamp;
+ params.outputFile = outputFile;
+ params.stride = stride;
+ params.columnFamily = columnFamily;
+ params.columnVisibility = columnVisibility;
+ return params;
}
}
- public static void createTable(AccumuloClient client, Opts args)
+ public static void createTable(AccumuloClient client, IngestParams params)
throws AccumuloException, AccumuloSecurityException, TableExistsException {
- if (args.createTable) {
- TreeSet<Text> splits = getSplitPoints(args.startRow, args.startRow + args.rows,
- args.numsplits);
+ if (params.createTable) {
+ TreeSet<Text> splits = getSplitPoints(params.startRow, params.startRow + params.rows,
+ params.numsplits);
- if (!client.tableOperations().exists(args.tableName))
- client.tableOperations().create(args.tableName);
+ if (!client.tableOperations().exists(params.tableName))
+ client.tableOperations().create(params.tableName);
try {
- client.tableOperations().addSplits(args.tableName, splits);
+ client.tableOperations().addSplits(params.tableName, splits);
} catch (TableNotFoundException ex) {
// unlikely
throw new RuntimeException(ex);
@@ -190,67 +235,68 @@ public class TestIngest {
if (opts.debug)
Logger.getLogger(TabletServerBatchWriter.class.getName()).setLevel(Level.TRACE);
- try (AccumuloClient client = opts.createClient()) {
- ingest(client, opts);
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
+ ingest(client, opts.getIngestPrams());
}
}
@SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
justification = "predictable random is okay for testing")
- public static void ingest(AccumuloClient accumuloClient, FileSystem fs, Opts opts)
+ public static void ingest(AccumuloClient accumuloClient, FileSystem fs, IngestParams params)
throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
MutationsRejectedException, TableExistsException {
long stopTime;
- byte[][] bytevals = generateValues(opts.dataSize);
+ byte[][] bytevals = generateValues(params.dataSize);
- byte[] randomValue = new byte[opts.dataSize];
+ byte[] randomValue = new byte[params.dataSize];
Random random = new Random();
long bytesWritten = 0;
- createTable(accumuloClient, opts);
+ createTable(accumuloClient, params);
BatchWriter bw = null;
FileSKVWriter writer = null;
- if (opts.outputFile != null) {
+ if (params.outputFile != null) {
ClientContext cc = (ClientContext) accumuloClient;
writer = FileOperations.getInstance().newWriterBuilder()
- .forFile(opts.outputFile + "." + RFile.EXTENSION, fs, cc.getHadoopConf(),
+ .forFile(params.outputFile + "." + RFile.EXTENSION, fs, cc.getHadoopConf(),
CryptoServiceFactory.newDefaultInstance())
.withTableConfiguration(DefaultConfiguration.getInstance()).build();
writer.startDefaultLocalityGroup();
} else {
- bw = accumuloClient.createBatchWriter(opts.tableName);
- accumuloClient.securityOperations().changeUserAuthorizations(opts.getPrincipal(), AUTHS);
+ bw = accumuloClient.createBatchWriter(params.tableName);
+ String principal = ClientProperty.AUTH_PRINCIPAL.getValue(params.clientProps);
+ accumuloClient.securityOperations().changeUserAuthorizations(principal, AUTHS);
}
- Text labBA = new Text(opts.columnVisibility.getExpression());
+ Text labBA = new Text(params.columnVisibility.getExpression());
long startTime = System.currentTimeMillis();
- for (int i = 0; i < opts.rows; i++) {
+ for (int i = 0; i < params.rows; i++) {
int rowid;
- if (opts.stride > 0) {
- rowid = ((i % opts.stride) * (opts.rows / opts.stride)) + (i / opts.stride);
+ if (params.stride > 0) {
+ rowid = ((i % params.stride) * (params.rows / params.stride)) + (i / params.stride);
} else {
rowid = i;
}
- Text row = generateRow(rowid, opts.startRow);
+ Text row = generateRow(rowid, params.startRow);
Mutation m = new Mutation(row);
- for (int j = 0; j < opts.cols; j++) {
- Text colf = new Text(opts.columnFamily);
+ for (int j = 0; j < params.cols; j++) {
+ Text colf = new Text(params.columnFamily);
Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX));
if (writer != null) {
Key key = new Key(row, colf, colq, labBA);
- if (opts.timestamp >= 0) {
- key.setTimestamp(opts.timestamp);
+ if (params.timestamp >= 0) {
+ key.setTimestamp(params.timestamp);
} else {
key.setTimestamp(startTime);
}
- if (opts.delete) {
+ if (params.delete) {
key.setDeleted(true);
} else {
key.setDeleted(false);
@@ -258,12 +304,13 @@ public class TestIngest {
bytesWritten += key.getSize();
- if (opts.delete) {
+ if (params.delete) {
writer.append(key, new Value(new byte[0]));
} else {
byte[] value;
- if (opts.random != null) {
- value = genRandomValue(random, randomValue, opts.random, rowid + opts.startRow, j);
+ if (params.random != null) {
+ value = genRandomValue(random, randomValue, params.random, rowid + params.startRow,
+ j);
} else {
value = bytevals[j % bytevals.length];
}
@@ -277,24 +324,25 @@ public class TestIngest {
Key key = new Key(row, colf, colq, labBA);
bytesWritten += key.getSize();
- if (opts.delete) {
- if (opts.timestamp >= 0)
- m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp);
+ if (params.delete) {
+ if (params.timestamp >= 0)
+ m.putDelete(colf, colq, params.columnVisibility, params.timestamp);
else
- m.putDelete(colf, colq, opts.columnVisibility);
+ m.putDelete(colf, colq, params.columnVisibility);
} else {
byte[] value;
- if (opts.random != null) {
- value = genRandomValue(random, randomValue, opts.random, rowid + opts.startRow, j);
+ if (params.random != null) {
+ value = genRandomValue(random, randomValue, params.random, rowid + params.startRow,
+ j);
} else {
value = bytevals[j % bytevals.length];
}
bytesWritten += value.length;
- if (opts.timestamp >= 0) {
- m.put(colf, colq, opts.columnVisibility, opts.timestamp, new Value(value, true));
+ if (params.timestamp >= 0) {
+ m.put(colf, colq, params.columnVisibility, params.timestamp, new Value(value, true));
} else {
- m.put(colf, colq, opts.columnVisibility, new Value(value, true));
+ m.put(colf, colq, params.columnVisibility, new Value(value, true));
}
}
@@ -303,7 +351,6 @@ public class TestIngest {
}
if (bw != null)
bw.addMutation(m);
-
}
if (writer != null) {
@@ -325,14 +372,13 @@ public class TestIngest {
System.err.println("ERROR : Constraint violates : " + cvs);
}
}
-
throw e;
}
}
stopTime = System.currentTimeMillis();
- int totalValues = opts.rows * opts.cols;
+ int totalValues = params.rows * params.cols;
double elapsed = (stopTime - startTime) / 1000.0;
System.out.printf(
@@ -342,10 +388,10 @@ public class TestIngest {
elapsed);
}
- public static void ingest(AccumuloClient c, Opts opts)
+ public static void ingest(AccumuloClient c, IngestParams params)
throws MutationsRejectedException, IOException, AccumuloException, AccumuloSecurityException,
TableNotFoundException, TableExistsException {
ClientContext cc = (ClientContext) c;
- ingest(c, FileSystem.get(cc.getHadoopConf()), opts);
+ ingest(c, FileSystem.get(cc.getHadoopConf()), params);
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java b/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
index b9f982d..f913d7a 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -75,18 +76,18 @@ public class TestMultiTableIngest {
Opts opts = new Opts();
opts.parseArgs(TestMultiTableIngest.class.getName(), args);
// create the test table within accumulo
- try (AccumuloClient accumuloClient = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
for (int i = 0; i < opts.tables; i++) {
tableNames.add(String.format(opts.prefix + "%04d", i));
}
if (!opts.readonly) {
for (String table : tableNames)
- accumuloClient.tableOperations().create(table);
+ client.tableOperations().create(table);
MultiTableBatchWriter b;
try {
- b = accumuloClient.createMultiTableBatchWriter();
+ b = client.createMultiTableBatchWriter();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -105,7 +106,7 @@ public class TestMultiTableIngest {
}
}
try {
- readBack(opts, accumuloClient, tableNames);
+ readBack(opts, client, tableNames);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java b/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java
index b0e8e72..cef264a 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
@@ -81,7 +82,7 @@ public class TestRandomDeletes {
private static TreeSet<RowColumn> scanAll(TestOpts opts) throws Exception {
TreeSet<RowColumn> result = new TreeSet<>();
- try (AccumuloClient client = opts.createClient();
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build();
Scanner scanner = client.createScanner(opts.tableName, auths)) {
for (Entry<Key,Value> entry : scanner) {
Key key = entry.getKey();
@@ -100,8 +101,8 @@ public class TestRandomDeletes {
ArrayList<RowColumn> entries = new ArrayList<>(rows);
java.util.Collections.shuffle(entries);
- try (AccumuloClient accumuloClient = opts.createClient();
- BatchWriter bw = accumuloClient.createBatchWriter(opts.tableName)) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build();
+ BatchWriter bw = client.createBatchWriter(opts.tableName)) {
for (int i = 0; i < (entries.size() + 1) / 2; i++) {
RowColumn rc = entries.get(i);
diff --git a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
index 21f0714..ea164c5 100644
--- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
@@ -22,11 +22,13 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
@@ -57,9 +59,31 @@ public class VerifyIngest {
return Integer.parseInt(k.getColumnQualifier().toString().split("_")[1]);
}
+ public static class VerifyParams extends TestIngest.IngestParams {
+ public boolean useGet = false;
+
+ public VerifyParams(Properties props) {
+ super(props);
+ }
+
+ public VerifyParams(Properties props, String table) {
+ super(props, table);
+ }
+
+ public VerifyParams(Properties props, String table, int rows) {
+ super(props, table, rows);
+ }
+ }
+
public static class Opts extends TestIngest.Opts {
@Parameter(names = "-useGet", description = "fetches values one at a time, instead of scanning")
public boolean useGet = false;
+
+ public VerifyParams getVerifyParams() {
+ VerifyParams params = new VerifyParams(getClientProps(), tableName);
+ params.useGet = useGet;
+ return params;
+ }
}
public static void main(String[] args) throws Exception {
@@ -74,8 +98,8 @@ public class VerifyIngest {
if (span != null)
span.addKVAnnotation("cmdLine", Arrays.asList(args).toString());
- try (AccumuloClient client = opts.createClient()) {
- verifyIngest(client, opts);
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
+ verifyIngest(client, opts.getVerifyParams());
}
} finally {
@@ -85,32 +109,33 @@ public class VerifyIngest {
@SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
justification = "predictable random is okay for testing")
- public static void verifyIngest(AccumuloClient accumuloClient, Opts opts)
+ public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams params)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- byte[][] bytevals = TestIngest.generateValues(opts.dataSize);
+ byte[][] bytevals = TestIngest.generateValues(params.dataSize);
Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2");
- accumuloClient.securityOperations().changeUserAuthorizations(opts.getPrincipal(), labelAuths);
+ String principal = ClientProperty.AUTH_PRINCIPAL.getValue(params.clientProps);
+ accumuloClient.securityOperations().changeUserAuthorizations(principal, labelAuths);
- int expectedRow = opts.startRow;
+ int expectedRow = params.startRow;
int expectedCol = 0;
int recsRead = 0;
long bytesRead = 0;
long t1 = System.currentTimeMillis();
- byte[] randomValue = new byte[opts.dataSize];
+ byte[] randomValue = new byte[params.dataSize];
Random random = new Random();
- Key endKey = new Key(new Text("row_" + String.format("%010d", opts.rows + opts.startRow)));
+ Key endKey = new Key(new Text("row_" + String.format("%010d", params.rows + params.startRow)));
int errors = 0;
- while (expectedRow < (opts.rows + opts.startRow)) {
+ while (expectedRow < (params.rows + params.startRow)) {
- if (opts.useGet) {
- Text rowKey = new Text("row_" + String.format("%010d", expectedRow + opts.startRow));
- Text colf = new Text(opts.columnFamily);
+ if (params.useGet) {
+ Text rowKey = new Text("row_" + String.format("%010d", expectedRow + params.startRow));
+ Text colf = new Text(params.columnFamily);
Text colq = new Text("col_" + String.format("%07d", expectedCol));
try (Scanner scanner = accumuloClient.createScanner("test_ingest", labelAuths)) {
@@ -128,8 +153,8 @@ public class VerifyIngest {
}
byte[] ev;
- if (opts.random != null) {
- ev = TestIngest.genRandomValue(random, randomValue, opts.random, expectedRow,
+ if (params.random != null) {
+ ev = TestIngest.genRandomValue(random, randomValue, params.random, expectedRow,
expectedCol);
} else {
ev = bytevals[expectedCol % bytevals.length];
@@ -150,7 +175,7 @@ public class VerifyIngest {
}
expectedCol++;
- if (expectedCol >= opts.cols) {
+ if (expectedCol >= params.cols) {
expectedCol = 0;
expectedRow++;
}
@@ -159,10 +184,10 @@ public class VerifyIngest {
Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow)));
- try (Scanner scanner = accumuloClient.createScanner(opts.tableName, labelAuths)) {
+ try (Scanner scanner = accumuloClient.createScanner(params.tableName, labelAuths)) {
scanner.setRange(new Range(startKey, endKey));
- for (int j = 0; j < opts.cols; j++) {
- scanner.fetchColumn(new Text(opts.columnFamily),
+ for (int j = 0; j < params.cols; j++) {
+ scanner.fetchColumn(new Text(params.columnFamily),
new Text("col_" + String.format("%07d", j)));
}
@@ -190,18 +215,18 @@ public class VerifyIngest {
errors++;
}
- if (expectedRow >= (opts.rows + opts.startRow)) {
+ if (expectedRow >= (params.rows + params.startRow)) {
log.error(
"expectedRow ({}) >= (ingestArgs.rows + ingestArgs.startRow) ({}), get"
+ " batch returned data passed end key",
- expectedRow, (opts.rows + opts.startRow));
+ expectedRow, (params.rows + params.startRow));
errors++;
break;
}
byte[] value;
- if (opts.random != null) {
- value = TestIngest.genRandomValue(random, randomValue, opts.random, expectedRow,
+ if (params.random != null) {
+ value = TestIngest.genRandomValue(random, randomValue, params.random, expectedRow,
colNum);
} else {
value = bytevals[colNum % bytevals.length];
@@ -214,14 +239,14 @@ public class VerifyIngest {
errors++;
}
- if (opts.timestamp >= 0 && entry.getKey().getTimestamp() != opts.timestamp) {
+ if (params.timestamp >= 0 && entry.getKey().getTimestamp() != params.timestamp) {
log.error("unexpected timestamp {}, rowNum : {} colNum : {}",
entry.getKey().getTimestamp(), rowNum, colNum);
errors++;
}
expectedCol++;
- if (expectedCol >= opts.cols) {
+ if (expectedCol >= params.cols) {
expectedCol = 0;
expectedRow++;
}
@@ -242,9 +267,9 @@ public class VerifyIngest {
throw new AccumuloException("saw " + errors + " errors ");
}
- if (expectedRow != (opts.rows + opts.startRow)) {
+ if (expectedRow != (params.rows + params.startRow)) {
throw new AccumuloException("Did not read expected number of rows. Saw "
- + (expectedRow - opts.startRow) + " expected " + opts.rows);
+ + (expectedRow - params.startRow) + " expected " + params.rows);
} else {
System.out.printf(
"%,12d records read | %,8d records/sec | %,12d bytes read |"
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
index b7bdc79..a86962f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -128,16 +129,10 @@ public class BalanceInPresenceOfOfflineTableIT extends AccumuloClusterHarness {
log.debug("starting test ingestion");
- TestIngest.Opts opts = new TestIngest.Opts();
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- opts.setClientProperties(getClientProperties());
- vopts.setClientProperties(getClientProperties());
- vopts.rows = opts.rows = 200000;
- opts.setTableName(TEST_TABLE);
- TestIngest.ingest(accumuloClient, opts);
+ VerifyParams params = new VerifyParams(getClientProperties(), TEST_TABLE, 200_000);
+ TestIngest.ingest(accumuloClient, params);
accumuloClient.tableOperations().flush(TEST_TABLE, null, null, true);
- vopts.setTableName(TEST_TABLE);
- VerifyIngest.verifyIngest(accumuloClient, vopts);
+ VerifyIngest.verifyIngest(accumuloClient, params);
log.debug("waiting for balancing, up to ~5 minutes to allow for migration cleanup.");
final long startTime = System.currentTimeMillis();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
index b03db9e..1828c1a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
@@ -25,9 +25,9 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.TestIngest;
-import org.apache.accumulo.test.TestIngest.Opts;
+import org.apache.accumulo.test.TestIngest.IngestParams;
import org.apache.accumulo.test.VerifyIngest;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
@@ -72,40 +72,32 @@ public class BulkIT extends AccumuloClusterHarness {
fs.mkdirs(bulkFailures);
fs.mkdirs(files);
- Opts opts = new Opts();
- opts.timestamp = 1;
- opts.random = 56;
- opts.rows = N;
- opts.cols = 1;
- opts.setTableName(tableName);
- opts.setClientProperties(info.getProperties());
- opts.conf = new Configuration(false);
- opts.fs = fs;
+ IngestParams params = new IngestParams(info.getProperties(), tableName, N);
+ params.timestamp = 1;
+ params.random = 56;
+ params.cols = 1;
String fileFormat = filePrefix + "rf%02d";
for (int i = 0; i < COUNT; i++) {
- opts.outputFile = new Path(files, String.format(fileFormat, i)).toString();
- opts.startRow = N * i;
- TestIngest.ingest(c, fs, opts);
+ params.outputFile = new Path(files, String.format(fileFormat, i)).toString();
+ params.startRow = N * i;
+ TestIngest.ingest(c, fs, params);
}
- opts.outputFile = new Path(files, String.format(fileFormat, N)).toString();
- opts.startRow = N;
- opts.rows = 1;
+ params.outputFile = new Path(files, String.format(fileFormat, N)).toString();
+ params.startRow = N;
+ params.rows = 1;
// create an rfile with one entry, there was a bug with this:
- TestIngest.ingest(c, fs, opts);
+ TestIngest.ingest(c, fs, params);
bulkLoad(c, tableName, bulkFailures, files, useOld);
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- vopts.setTableName(tableName);
- vopts.random = 56;
- vopts.setClientProperties(info.getProperties());
+ VerifyParams verifyParams = new VerifyParams(info.getProperties(), tableName, N);
+ verifyParams.random = 56;
for (int i = 0; i < COUNT; i++) {
- vopts.startRow = i * N;
- vopts.rows = N;
- VerifyIngest.verifyIngest(c, vopts);
+ verifyParams.startRow = i * N;
+ VerifyIngest.verifyIngest(c, verifyParams);
}
- vopts.startRow = N;
- vopts.rows = 1;
- VerifyIngest.verifyIngest(c, vopts);
+ verifyParams.startRow = N;
+ verifyParams.rows = 1;
+ VerifyIngest.verifyIngest(c, verifyParams);
}
@SuppressWarnings("deprecation")
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
index 63b41fe..defb91d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@ -26,6 +26,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -76,8 +77,8 @@ public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
}
}
- static final int ROWS = 100000;
- static final int SPLITS = 99;
+ private static final int ROWS = 100000;
+ private static final int SPLITS = 99;
@Test
public void testBulkSplitOptimization() throws Exception {
@@ -108,21 +109,16 @@ public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
}
FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
- VerifyIngest.Opts opts = new VerifyIngest.Opts();
- opts.timestamp = 1;
- opts.dataSize = 50;
- opts.random = 56;
- opts.rows = 100000;
- opts.startRow = 0;
- opts.cols = 1;
- opts.setTableName(tableName);
-
- opts.setClientProperties(getClientProperties());
- VerifyIngest.verifyIngest(c, opts);
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, ROWS);
+ params.timestamp = 1;
+ params.dataSize = 50;
+ params.random = 56;
+ params.startRow = 0;
+ params.cols = 1;
+ VerifyIngest.verifyIngest(c, params);
// ensure each tablet does not have all map files, should be ~2.5 files per tablet
FunctionalTestUtils.checkRFiles(c, tableName, 50, 100, 1, 4);
}
}
-
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
index 25eac32..2fbb038 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.master.balancer.ChaoticLoadBalancer;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
@@ -58,19 +59,13 @@ public class ChaoticBalancerIT extends AccumuloClusterHarness {
ntc.setProperties(Stream
.of(new Pair<>(Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"),
new Pair<>(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K"))
- .collect(Collectors.toMap(k -> k.getFirst(), v -> v.getSecond())));
+ .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)));
c.tableOperations().create(tableName, ntc);
- TestIngest.Opts opts = new TestIngest.Opts();
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- vopts.rows = opts.rows = 20000;
- opts.setTableName(tableName);
- vopts.setTableName(tableName);
- opts.setClientProperties(getClientProperties());
- vopts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, opts);
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 20_000);
+ TestIngest.ingest(c, params);
c.tableOperations().flush(tableName, null, null, true);
- VerifyIngest.verifyIngest(c, vopts);
+ VerifyIngest.verifyIngest(c, params);
}
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index ceca72c..33ca650 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -127,23 +128,17 @@ public class CompactionIT extends AccumuloClusterHarness {
final int span = 500000 / 59;
for (int i = 0; i < 500000; i += 500000 / 59) {
final int finalI = i;
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- VerifyIngest.Opts opts = new VerifyIngest.Opts();
- opts.startRow = finalI;
- opts.rows = span;
- opts.random = 56;
- opts.dataSize = 50;
- opts.cols = 1;
- opts.setTableName(tableName);
- opts.setClientProperties(getClientProperties());
- VerifyIngest.verifyIngest(c, opts);
- } catch (Exception ex) {
- log.warn("Got exception verifying data", ex);
- fail.set(true);
- }
+ Runnable r = () -> {
+ try {
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, span);
+ params.startRow = finalI;
+ params.random = 56;
+ params.dataSize = 50;
+ params.cols = 1;
+ VerifyIngest.verifyIngest(c, params);
+ } catch (Exception ex) {
+ log.warn("Got exception verifying data", ex);
+ fail.set(true);
}
};
executor.execute(r);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java
index 483b0de..0f5aa22 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java
@@ -24,6 +24,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.TestRandomDeletes;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.junit.Test;
public class DeleteIT extends AccumuloClusterHarness {
@@ -44,23 +45,14 @@ public class DeleteIT extends AccumuloClusterHarness {
public static void deleteTest(AccumuloClient c, AccumuloCluster cluster, String tableName)
throws Exception {
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- TestIngest.Opts opts = new TestIngest.Opts();
- vopts.setTableName(tableName);
- opts.setTableName(tableName);
- vopts.rows = opts.rows = 1000;
- vopts.cols = opts.cols = 1;
- vopts.random = opts.random = 56;
-
- opts.setClientProperties(getClientProperties());
- vopts.setClientProperties(getClientProperties());
-
- TestIngest.ingest(c, opts);
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 1000);
+ params.cols = 1;
+ params.random = 56;
+ TestIngest.ingest(c, params);
assertEquals(0, cluster.getClusterControl().exec(TestRandomDeletes.class,
new String[] {"-c", cluster.getClientPropsPath(), "--table", tableName}));
- TestIngest.ingest(c, opts);
- VerifyIngest.verifyIngest(c, vopts);
+ TestIngest.ingest(c, params);
+ VerifyIngest.verifyIngest(c, params);
}
-
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
index 72d7504..3de219e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestIngest.IngestParams;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
@@ -84,12 +85,9 @@ public class DynamicThreadPoolsIT extends AccumuloClusterHarness {
String firstTable = tables[0];
try (AccumuloClient c = createAccumuloClient()) {
c.instanceOperations().setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "5");
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.rows = 500 * 1000;
- opts.createTable = true;
- opts.setTableName(firstTable);
- opts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, opts);
+ IngestParams params = new IngestParams(getClientProperties(), firstTable, 500_000);
+ params.createTable = true;
+ TestIngest.ingest(c, params);
c.tableOperations().flush(firstTable, null, null, true);
for (int i = 1; i < tables.length; i++)
c.tableOperations().clone(firstTable, tables[i], true, null, null);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
index 37d9b66..5745ad0 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestIngest.IngestParams;
import org.apache.hadoop.io.Text;
import org.junit.Test;
@@ -46,15 +47,12 @@ public class FateStarvationIT extends AccumuloClusterHarness {
c.tableOperations().addSplits(tableName, TestIngest.getSplitPoints(0, 100000, 50));
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.random = 89;
- opts.timestamp = 7;
- opts.dataSize = 50;
- opts.rows = 100000;
- opts.cols = 1;
- opts.setTableName(tableName);
- opts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, opts);
+ IngestParams params = new IngestParams(getClientProperties(), tableName, 100_000);
+ params.random = 89;
+ params.timestamp = 7;
+ params.dataSize = 50;
+ params.cols = 1;
+ TestIngest.ingest(c, params);
c.tableOperations().flush(tableName, null, null, true);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 6a76d48..676b57d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -126,17 +126,17 @@ public class FunctionalTestUtils {
ExecutorService threadPool = Executors.newFixedThreadPool(threads);
final AtomicBoolean fail = new AtomicBoolean(false);
for (int i = 0; i < rows; i += rows / splits) {
- final TestIngest.Opts opts = new TestIngest.Opts();
- opts.outputFile = String.format("%s/mf%s", path, i);
- opts.random = 56;
- opts.timestamp = 1;
- opts.dataSize = 50;
- opts.rows = rows / splits;
- opts.startRow = i;
- opts.cols = 1;
+ TestIngest.IngestParams params = new TestIngest.IngestParams(c.properties());
+ params.outputFile = String.format("%s/mf%s", path, i);
+ params.random = 56;
+ params.timestamp = 1;
+ params.dataSize = 50;
+ params.rows = rows / splits;
+ params.startRow = i;
+ params.cols = 1;
threadPool.execute(() -> {
try {
- TestIngest.ingest(c, fs, opts);
+ TestIngest.ingest(c, fs, params);
} catch (Exception e) {
fail.set(true);
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 05aa813..91b924d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -58,6 +58,7 @@ import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -109,17 +110,14 @@ public class GarbageCollectorIT extends ConfigurableMacBase {
@Test
public void gcTest() throws Exception {
killMacGc();
+ final String table = "test_ingest";
try (AccumuloClient c = createClient()) {
- c.tableOperations().create("test_ingest");
- c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
- TestIngest.Opts opts = new TestIngest.Opts();
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- vopts.rows = opts.rows = 10000;
- vopts.cols = opts.cols = 1;
- opts.setClientProperties(getClientProperties());
- vopts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, cluster.getFileSystem(), opts);
- c.tableOperations().compact("test_ingest", null, null, true, true);
+ c.tableOperations().create(table);
+ c.tableOperations().setProperty(table, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
+ VerifyParams params = new VerifyParams(getClientProperties(), table, 10_000);
+ params.cols = 1;
+ TestIngest.ingest(c, cluster.getFileSystem(), params);
+ c.tableOperations().compact(table, null, null, true, true);
int before = countFiles();
while (true) {
sleepUninterruptibly(1, TimeUnit.SECONDS);
@@ -133,7 +131,7 @@ public class GarbageCollectorIT extends ConfigurableMacBase {
getCluster().start();
sleepUninterruptibly(15, TimeUnit.SECONDS);
int after = countFiles();
- VerifyIngest.verifyIngest(c, vopts);
+ VerifyIngest.verifyIngest(c, params);
assertTrue(after < before);
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
index 27be097..9ffbeaf 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
@@ -174,10 +174,9 @@ public class HalfDeadTServerIT extends ConfigurableMacBase {
if (seconds <= 10) {
assertEquals(0, ingest.waitFor());
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- vopts.rows = rows;
- vopts.setClientProperties(getClientProperties());
- VerifyIngest.verifyIngest(c, vopts);
+ VerifyIngest.VerifyParams params = new VerifyIngest.VerifyParams(getClientProperties());
+ params.rows = rows;
+ VerifyIngest.verifyIngest(c, params);
} else {
sleepUninterruptibly(5, TimeUnit.SECONDS);
tserver.waitFor();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
index 9c2bef8..d92bcd1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
@@ -50,10 +51,8 @@ public class MasterFailoverIT extends AccumuloClusterHarness {
try (AccumuloClient c = createAccumuloClient()) {
String[] names = getUniqueNames(2);
c.tableOperations().create(names[0]);
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.setTableName(names[0]);
- opts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, opts);
+ VerifyParams params = new VerifyParams(getClientProperties(), names[0]);
+ TestIngest.ingest(c, params);
ClusterControl control = cluster.getClusterControl();
control.stopAllServers(ServerType.MASTER);
@@ -61,10 +60,8 @@ public class MasterFailoverIT extends AccumuloClusterHarness {
control.startAllServers(ServerType.MASTER);
// talk to it
c.tableOperations().rename(names[0], names[1]);
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- vopts.setTableName(names[1]);
- vopts.setClientProperties(getClientProperties());
- VerifyIngest.verifyIngest(c, vopts);
+ params.tableName = names[1];
+ VerifyIngest.verifyIngest(c, params);
}
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MaxOpenIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MaxOpenIT.java
index 22b3bba..5bedf8d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MaxOpenIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MaxOpenIT.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestIngest.IngestParams;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
@@ -103,15 +104,13 @@ public class MaxOpenIT extends AccumuloClusterHarness {
// the following loop should create three tablets in each map file
for (int i = 0; i < 3; i++) {
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.timestamp = i;
- opts.dataSize = 50;
- opts.rows = NUM_TO_INGEST;
- opts.cols = 1;
- opts.random = i;
- opts.setTableName(tableName);
- opts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, opts);
+ IngestParams params = new IngestParams(getClientProperties(), tableName, NUM_TO_INGEST);
+ params.timestamp = i;
+ params.dataSize = 50;
+ params.rows = NUM_TO_INGEST;
+ params.cols = 1;
+ params.random = i;
+ TestIngest.ingest(c, params);
c.tableOperations().flush(tableName, null, null, true);
FunctionalTestUtils.checkRFiles(c, tableName, NUM_TABLETS, NUM_TABLETS, i + 1, i + 1);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index dc90fc2..0c58c24 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -80,8 +80,10 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestIngest.IngestParams;
import org.apache.accumulo.test.TestMultiTableIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.categories.StandaloneCapableClusterTests;
import org.apache.accumulo.test.categories.SunnyDayTests;
import org.apache.hadoop.conf.Configuration;
@@ -206,17 +208,13 @@ public class ReadWriteIT extends AccumuloClusterHarness {
public static void ingest(AccumuloClient accumuloClient, ClientInfo info, int rows, int cols,
int width, int offset, String colf, String tableName) throws Exception {
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.rows = rows;
- opts.cols = cols;
- opts.dataSize = width;
- opts.startRow = offset;
- opts.columnFamily = colf;
- opts.createTable = true;
- opts.setTableName(tableName);
- opts.setClientProperties(info.getProperties());
-
- TestIngest.ingest(accumuloClient, opts);
+ IngestParams params = new IngestParams(info.getProperties(), tableName, rows);
+ params.cols = cols;
+ params.dataSize = width;
+ params.startRow = offset;
+ params.columnFamily = colf;
+ params.createTable = true;
+ TestIngest.ingest(accumuloClient, params);
}
public static void verify(AccumuloClient accumuloClient, ClientInfo info, int rows, int cols,
@@ -226,16 +224,13 @@ public class ReadWriteIT extends AccumuloClusterHarness {
private static void verify(AccumuloClient accumuloClient, ClientInfo info, int rows, int cols,
int width, int offset, String colf, String tableName) throws Exception {
- VerifyIngest.Opts opts = new VerifyIngest.Opts();
- opts.rows = rows;
- opts.cols = cols;
- opts.dataSize = width;
- opts.startRow = offset;
- opts.columnFamily = colf;
- opts.setTableName(tableName);
- opts.setClientProperties(info.getProperties());
-
- VerifyIngest.verifyIngest(accumuloClient, opts);
+ VerifyParams params = new VerifyParams(info.getProperties(), tableName, rows);
+ params.rows = rows;
+ params.dataSize = width;
+ params.startRow = offset;
+ params.columnFamily = colf;
+ params.cols = cols;
+ VerifyIngest.verifyIngest(accumuloClient, params);
}
public static String[] args(String... args) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
index 07c48e8..0697070 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
@@ -21,6 +21,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.junit.Test;
public class RenameIT extends AccumuloClusterHarness {
@@ -35,26 +36,20 @@ public class RenameIT extends AccumuloClusterHarness {
String[] tableNames = getUniqueNames(2);
String name1 = tableNames[0];
String name2 = tableNames[1];
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.createTable = true;
- opts.setTableName(name1);
- opts.setClientProperties(cluster.getClientProperties());
+ VerifyParams params = new VerifyParams(cluster.getClientProperties(), name1);
+ params.createTable = true;
try (AccumuloClient c = createAccumuloClient()) {
- TestIngest.ingest(c, opts);
+ TestIngest.ingest(c, params);
c.tableOperations().rename(name1, name2);
- TestIngest.ingest(c, opts);
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- vopts.setClientProperties(cluster.getClientProperties());
- vopts.setTableName(name2);
- VerifyIngest.verifyIngest(c, vopts);
+ TestIngest.ingest(c, params);
+ params.tableName = name2;
+ VerifyIngest.verifyIngest(c, params);
c.tableOperations().delete(name1);
c.tableOperations().rename(name2, name1);
- vopts.setTableName(name1);
- VerifyIngest.verifyIngest(c, vopts);
-
+ params.tableName = name1;
+ VerifyIngest.verifyIngest(c, params);
FunctionalTestUtils.assertNoDanglingFateLocks((ClientContext) c, getCluster());
}
}
-
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
index 83204e8..fc778e0 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
@@ -42,7 +42,9 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestIngest.IngestParams;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.After;
@@ -68,12 +70,6 @@ public class RestartIT extends AccumuloClusterHarness {
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
- private static final VerifyIngest.Opts VOPTS = new VerifyIngest.Opts();
- private static final TestIngest.Opts OPTS = new TestIngest.Opts();
- static {
- OPTS.rows = VOPTS.rows = 10 * 1000;
- }
-
private ExecutorService svc;
@Before
@@ -102,13 +98,13 @@ public class RestartIT extends AccumuloClusterHarness {
final String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
final ClusterControl control = getCluster().getClusterControl();
- VOPTS.setTableName(tableName);
- VOPTS.setClientProperties(getClientProperties());
+
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000);
Future<Integer> ret = svc.submit(() -> {
try {
return control.exec(TestIngest.class, new String[] {"-c", cluster.getClientPropsPath(),
- "--rows", "" + OPTS.rows, "--table", tableName});
+ "--rows", "" + params.rows, "--table", tableName});
} catch (IOException e) {
log.error("Error running TestIngest", e);
return -1;
@@ -118,7 +114,7 @@ public class RestartIT extends AccumuloClusterHarness {
control.stopAllServers(ServerType.MASTER);
control.startAllServers(ServerType.MASTER);
assertEquals(0, ret.get().intValue());
- VerifyIngest.verifyIngest(c, VOPTS);
+ VerifyIngest.verifyIngest(c, params);
}
}
@@ -127,11 +123,8 @@ public class RestartIT extends AccumuloClusterHarness {
try (AccumuloClient c = createAccumuloClient()) {
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
- OPTS.setTableName(tableName);
- VOPTS.setTableName(tableName);
- OPTS.setClientProperties(getClientProperties());
- VOPTS.setClientProperties(getClientProperties());
- TestIngest.ingest(c, OPTS);
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000);
+ TestIngest.ingest(c, params);
ClusterControl control = getCluster().getClusterControl();
// TODO implement a kill all too?
@@ -169,7 +162,7 @@ public class RestartIT extends AccumuloClusterHarness {
}
} while (masterLockData != null);
cluster.start();
- VerifyIngest.verifyIngest(c, VOPTS);
+ VerifyIngest.verifyIngest(c, params);
}
}
@@ -181,13 +174,12 @@ public class RestartIT extends AccumuloClusterHarness {
c.tableOperations().create(tableName);
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
- VOPTS.setTableName(tableName);
- VOPTS.setClientProperties(getClientProperties());
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000);
Future<Integer> ret = svc.submit(() -> {
try {
return control.exec(TestIngest.class, new String[] {"-c", cluster.getClientPropsPath(),
- "--rows", "" + VOPTS.rows, "--table", tableName});
+ "--rows", "" + params.rows, "--table", tableName});
} catch (Exception e) {
log.error("Error running TestIngest", e);
return -1;
@@ -211,7 +203,7 @@ public class RestartIT extends AccumuloClusterHarness {
cluster.start();
assertEquals(0, ret.get().intValue());
- VerifyIngest.verifyIngest(c, VOPTS);
+ VerifyIngest.verifyIngest(c, params);
}
}
@@ -220,15 +212,12 @@ public class RestartIT extends AccumuloClusterHarness {
try (AccumuloClient c = createAccumuloClient()) {
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
- OPTS.setTableName(tableName);
- VOPTS.setTableName(tableName);
- OPTS.setClientProperties(getClientProperties());
- VOPTS.setClientProperties(getClientProperties());
- TestIngest.ingest(c, OPTS);
- VerifyIngest.verifyIngest(c, VOPTS);
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000);
+ TestIngest.ingest(c, params);
+ VerifyIngest.verifyIngest(c, params);
cluster.getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
cluster.start();
- VerifyIngest.verifyIngest(c, VOPTS);
+ VerifyIngest.verifyIngest(c, params);
}
}
@@ -253,9 +242,8 @@ public class RestartIT extends AccumuloClusterHarness {
try (AccumuloClient c = createAccumuloClient()) {
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
- OPTS.setTableName(tableName);
- OPTS.setClientProperties(getClientProperties());
- TestIngest.ingest(c, OPTS);
+ IngestParams params = new IngestParams(getClientProperties(), tableName, 10_000);
+ TestIngest.ingest(c, params);
try {
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getCluster().getClusterControl().adminStopAll();
@@ -269,9 +257,7 @@ public class RestartIT extends AccumuloClusterHarness {
public void shutdownDuringCompactingSplitting() throws Exception {
try (AccumuloClient c = createAccumuloClient()) {
String tableName = getUniqueNames(1)[0];
- VOPTS.setTableName(tableName);
- OPTS.setClientProperties(getClientProperties());
- VOPTS.setClientProperties(getClientProperties());
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000);
c.tableOperations().create(tableName);
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
String splitThreshold = null;
@@ -285,12 +271,9 @@ public class RestartIT extends AccumuloClusterHarness {
try {
c.tableOperations().setProperty(MetadataTable.NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"20K");
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.setTableName(tableName);
- opts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, opts);
+ TestIngest.ingest(c, params);
c.tableOperations().flush(tableName, null, null, false);
- VerifyIngest.verifyIngest(c, VOPTS);
+ VerifyIngest.verifyIngest(c, params);
getCluster().stop();
} finally {
if (getClusterType() == ClusterType.STANDALONE) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
index 41bb33a..9335d8d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.After;
@@ -83,12 +84,6 @@ public class RestartStressIT extends AccumuloClusterHarness {
}
}
- private static final VerifyIngest.Opts VOPTS;
- static {
- VOPTS = new VerifyIngest.Opts();
- VOPTS.rows = 10 * 1000;
- }
-
@Test
public void test() throws Exception {
try (AccumuloClient c = createAccumuloClient()) {
@@ -97,10 +92,12 @@ public class RestartStressIT extends AccumuloClusterHarness {
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "500K");
final ClusterControl control = getCluster().getClusterControl();
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000);
+
Future<Integer> retCode = svc.submit(() -> {
try {
return control.exec(TestIngest.class, new String[] {"-c", cluster.getClientPropsPath(),
- "--rows", "" + VOPTS.rows, "--table", tableName});
+ "--rows", "" + params.rows, "--table", tableName});
} catch (Exception e) {
log.error("Error running TestIngest", e);
return -1;
@@ -113,9 +110,7 @@ public class RestartStressIT extends AccumuloClusterHarness {
control.startAllServers(ServerType.TABLET_SERVER);
}
assertEquals(0, retCode.get().intValue());
- VOPTS.setTableName(tableName);
- VOPTS.setClientProperties(getClientProperties());
- VerifyIngest.verifyIngest(c, VOPTS);
+ VerifyIngest.verifyIngest(c, params);
}
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
index 95008cb..97f3b37 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
@@ -71,10 +71,9 @@ public class SimpleBalancerFairnessIT extends ConfigurableMacBase {
log.info("Creating {} splits", splits.size());
c.tableOperations().addSplits("unused", splits);
List<String> tservers = c.instanceOperations().getTabletServers();
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.rows = 50000;
- opts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, opts);
+ TestIngest.IngestParams params = new TestIngest.IngestParams(getClientProperties());
+ params.rows = 50000;
+ TestIngest.ingest(c, params);
c.tableOperations().flush("test_ingest", null, null, false);
sleepUninterruptibly(45, TimeUnit.SECONDS);
Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index c23cb9f..002a594 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.util.CheckForMetadataProblems;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assume;
@@ -124,17 +125,9 @@ public class SplitIT extends AccumuloClusterHarness {
c.tableOperations().setProperty(table, Property.TABLE_SPLIT_THRESHOLD.getKey(), "256K");
c.tableOperations().setProperty(table, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
"1K");
- TestIngest.Opts opts = new TestIngest.Opts();
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- opts.rows = 100000;
- opts.setTableName(table);
- opts.setClientProperties(getClientProperties());
-
- TestIngest.ingest(c, opts);
- vopts.rows = opts.rows;
- vopts.setTableName(table);
- vopts.setClientProperties(getClientProperties());
- VerifyIngest.verifyIngest(c, vopts);
+ VerifyParams params = new VerifyParams(getClientProperties(), table, 100_000);
+ TestIngest.ingest(c, params);
+ VerifyIngest.verifyIngest(c, params);
while (c.tableOperations().listSplits(table).size() < 10) {
sleepUninterruptibly(15, TimeUnit.SECONDS);
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
index 3f196c8..c385a18 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.categories.MiniClusterOnlyTests;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -66,15 +67,10 @@ public class TableIT extends AccumuloClusterHarness {
String tableName = getUniqueNames(1)[0];
to.create(tableName);
- TestIngest.Opts opts = new TestIngest.Opts();
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- opts.setClientProperties(getClientProperties());
- vopts.setClientProperties(getClientProperties());
- opts.setTableName(tableName);
- TestIngest.ingest(c, opts);
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName);
+ TestIngest.ingest(c, params);
to.flush(tableName, null, null, true);
- vopts.setTableName(tableName);
- VerifyIngest.verifyIngest(c, vopts);
+ VerifyIngest.verifyIngest(c, params);
TableId id = TableId.of(to.tableIdMap().get(tableName));
try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
s.setRange(new KeyExtent(id, null, null).toMetadataRange());
@@ -93,8 +89,8 @@ public class TableIT extends AccumuloClusterHarness {
}
assertNull(to.tableIdMap().get(tableName));
to.create(tableName);
- TestIngest.ingest(c, opts);
- VerifyIngest.verifyIngest(c, vopts);
+ TestIngest.ingest(c, params);
+ VerifyIngest.verifyIngest(c, params);
to.delete(tableName);
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
index 76f23fd..f738eb1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.Test;
@@ -59,18 +60,11 @@ public class WriteAheadLogIT extends AccumuloClusterHarness {
public static void testWAL(AccumuloClient c, String tableName) throws Exception {
c.tableOperations().create(tableName);
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "750K");
- TestIngest.Opts opts = new TestIngest.Opts();
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- opts.setTableName(tableName);
- opts.setClientProperties(getClientProperties());
- vopts.setClientProperties(getClientProperties());
-
- TestIngest.ingest(c, opts);
- vopts.setTableName(tableName);
- VerifyIngest.verifyIngest(c, vopts);
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName);
+ TestIngest.ingest(c, params);
+ VerifyIngest.verifyIngest(c, params);
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
- VerifyIngest.verifyIngest(c, vopts);
+ VerifyIngest.verifyIngest(c, params);
}
-
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java
index 9bfa724..a10595e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java
@@ -26,7 +26,9 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestIngest.IngestParams;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.junit.Test;
public class WriteLotsIT extends AccumuloClusterHarness {
@@ -53,12 +55,9 @@ public class WriteLotsIT extends AccumuloClusterHarness {
final int index = i;
Runnable r = () -> {
try {
- TestIngest.Opts opts = new TestIngest.Opts();
- opts.startRow = index * 10000;
- opts.rows = 10000;
- opts.setTableName(tableName);
- opts.setClientProperties(getClientProperties());
- TestIngest.ingest(c, opts);
+ IngestParams ingestParams = new IngestParams(getClientProperties(), tableName, 10_000);
+ ingestParams.startRow = index * 10000;
+ TestIngest.ingest(c, ingestParams);
} catch (Exception ex) {
ref.set(ex);
}
@@ -70,12 +69,8 @@ public class WriteLotsIT extends AccumuloClusterHarness {
if (ref.get() != null) {
throw ref.get();
}
- VerifyIngest.Opts vopts = new VerifyIngest.Opts();
- vopts.rows = 10000 * THREADS;
- vopts.setTableName(tableName);
- vopts.setClientProperties(getClientProperties());
- VerifyIngest.verifyIngest(c, vopts);
+ VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000 * THREADS);
+ VerifyIngest.verifyIngest(c, params);
}
}
-
}
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java
index 6282f7a..fc027e1 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java
@@ -23,11 +23,11 @@ import java.util.Collections;
import org.apache.accumulo.core.cli.ClientOpts;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.clientImpl.ClientConfConverter;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
@@ -70,90 +70,11 @@ public class RowHash extends Configured implements Tool {
}
private static class Opts extends ClientOpts {
- private static final Logger log = LoggerFactory.getLogger(Opts.class);
-
@Parameter(names = "--column", required = true)
String column;
@Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
- private String tableName;
-
- public String getTableName() {
- return tableName;
- }
-
- public void setAccumuloConfigs(Job job) throws AccumuloSecurityException {
- org.apache.accumulo.core.client.ClientConfiguration clientConf = ClientConfConverter
- .toClientConf(this.getClientProperties());
- org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setZooKeeperInstance(job,
- clientConf);
-
- org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setZooKeeperInstance(job,
- clientConf);
-
- final String principal = getPrincipal();
- getTableName();
-
- AuthenticationToken token = getToken();
- org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setConnectorInfo(job, principal,
- token);
- org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setConnectorInfo(job,
- principal, token);
- org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setInputTableName(job,
- getTableName());
- org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanAuthorizations(job,
- auths);
- org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setCreateTables(job, true);
- org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setDefaultTableName(job,
- getTableName());
- }
-
- @Override
- public AuthenticationToken getToken() {
- AuthenticationToken authToken = super.getToken();
- // For MapReduce, Kerberos credentials don't make it to the Mappers and Reducers,
- // so we need to request a delegation token and use that instead.
- if (authToken instanceof KerberosToken) {
- log.info("Received KerberosToken, fetching DelegationToken for MapReduce");
- final KerberosToken krbToken = (KerberosToken) authToken;
-
- try {
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
- if (!user.hasKerberosCredentials()) {
- throw new IllegalStateException("Expected current user to have Kerberos credentials");
- }
-
- String newPrincipal = user.getUserName();
- log.info("Obtaining delegation token for {}", newPrincipal);
-
- setPrincipal(newPrincipal);
- AccumuloClient client = Accumulo.newClient().from(getClientProperties())
- .as(newPrincipal, krbToken).build();
-
- // Do the explicit check to see if the user has the permission to get a delegation token
- if (!client.securityOperations().hasSystemPermission(client.whoami(),
- SystemPermission.OBTAIN_DELEGATION_TOKEN)) {
- log.error(
- "{} doesn't have the {} SystemPermission neccesary to obtain a delegation"
- + " token. MapReduce tasks cannot automatically use the client's"
- + " credentials on remote servers. Delegation tokens provide a means to run"
- + " MapReduce without distributing the user's credentials.",
- user.getUserName(), SystemPermission.OBTAIN_DELEGATION_TOKEN.name());
- throw new IllegalStateException(
- client.whoami() + " does not have permission to obtain a delegation token");
- }
-
- // Get the delegation token from Accumulo
- return client.securityOperations().getDelegationToken(new DelegationTokenConfig());
- } catch (Exception e) {
- final String msg = "Failed to acquire DelegationToken for use with MapReduce";
- log.error(msg, e);
- throw new RuntimeException(msg, e);
- }
- }
- return authToken;
- }
-
+ String tableName;
}
@Override
@@ -163,8 +84,28 @@ public class RowHash extends Configured implements Tool {
job.setJarByClass(this.getClass());
Opts opts = new Opts();
opts.parseArgs(RowHash.class.getName(), args);
+
job.setInputFormatClass(org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.class);
- opts.setAccumuloConfigs(job);
+ org.apache.accumulo.core.client.ClientConfiguration clientConf = ClientConfConverter
+ .toClientConf(opts.getClientProps());
+ org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setZooKeeperInstance(job,
+ clientConf);
+ org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setZooKeeperInstance(job,
+ clientConf);
+
+ final String principal = ClientProperty.AUTH_PRINCIPAL.getValue(opts.getClientProps());
+ AuthenticationToken token = opts.getToken();
+ org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setConnectorInfo(job, principal,
+ token);
+ org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setConnectorInfo(job,
+ principal, token);
+ org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setInputTableName(job,
+ opts.tableName);
+ org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanAuthorizations(job,
+ opts.auths);
+ org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setCreateTables(job, true);
+ org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setDefaultTableName(job,
+ opts.tableName);
String col = opts.column;
int idx = col.indexOf(":");
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/performance/ContinuousIngest.java
index 028a5b7..631bc26 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/ContinuousIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/ContinuousIngest.java
@@ -30,6 +30,7 @@ import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -96,7 +97,7 @@ public class ContinuousIngest {
if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) {
throw new IllegalArgumentException("bad min and max");
}
- try (AccumuloClient client = clientOpts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(clientOpts.getClientProps()).build()) {
if (!client.tableOperations().exists(clientOpts.tableName)) {
throw new TableNotFoundException(null, clientOpts.tableName,
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index e00cc0a..3c58f1b 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -215,7 +216,7 @@ public class CollectTabletStats {
runTest("read tablet files w/ table iter stack", tests, opts.numThreads, threadPool);
}
- try (AccumuloClient client = opts.createClient()) {
+ try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
for (int i = 0; i < opts.iterations; i++) {
ArrayList<Test> tests = new ArrayList<>();
for (final KeyExtent ke : tabletsToTest) {