You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/12/19 17:25:07 UTC
svn commit: r1423923 [5/8] - in /accumulo/trunk: ./ bin/ core/
core/src/main/java/org/apache/accumulo/core/cli/
core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org/apache/accumulo/core/client/mapreduce/
core/src/main/java/or...
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/TestRandomDeletes.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/TestRandomDeletes.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/TestRandomDeletes.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/TestRandomDeletes.java Wed Dec 19 16:25:03 2012
@@ -16,14 +16,13 @@
*/
package org.apache.accumulo.server.test;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Column;
@@ -32,15 +31,7 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.Parser;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -48,8 +39,6 @@ public class TestRandomDeletes {
private static final Logger log = Logger.getLogger(TestRandomDeletes.class);
private static Authorizations auths = new Authorizations("L1", "L2", "G1", "GROUP2");
- private static AuthInfo credentials;
-
static private class RowColumn implements Comparable<RowColumn> {
Text row;
Column column;
@@ -73,11 +62,11 @@ public class TestRandomDeletes {
}
}
- private static TreeSet<RowColumn> scanAll(Text t) throws Exception {
+ private static TreeSet<RowColumn> scanAll(ClientOpts opts, Text t) throws Exception {
TreeSet<RowColumn> result = new TreeSet<RowColumn>();
- Connector conn = HdfsZooInstance.getInstance().getConnector(credentials.user, credentials.password);
+ Connector conn = opts.getConnector();
Scanner scanner = conn.createScanner(t.toString(), auths);
- scanner.setBatchSize(20000);
+ scanner.setBatchSize(opts.scanBatchSize);
for (Entry<Key,Value> entry : scanner) {
Key key = entry.getKey();
Column column = new Column(TextUtil.getBytes(key.getColumnFamily()), TextUtil.getBytes(key.getColumnQualifier()), TextUtil.getBytes(key
@@ -87,19 +76,18 @@ public class TestRandomDeletes {
return result;
}
- private static long scrambleDeleteHalfAndCheck(Text t, Set<RowColumn> rows) throws Exception {
+ private static long scrambleDeleteHalfAndCheck(ClientOpts opts, Text t, Set<RowColumn> rows) throws Exception {
int result = 0;
ArrayList<RowColumn> entries = new ArrayList<RowColumn>(rows);
java.util.Collections.shuffle(entries);
- Connector connector = HdfsZooInstance.getInstance().getConnector(credentials.user, credentials.password);
- BatchWriter mutations = connector.createBatchWriter(t.toString(), new BatchWriterConfig());
- ColumnVisibility cv = new ColumnVisibility("L1&L2&G1&GROUP2");
+ Connector connector = opts.getConnector();
+ BatchWriter mutations = connector.createBatchWriter(t.toString(), opts.getBatchWriterConfig());
for (int i = 0; i < (entries.size() + 1) / 2; i++) {
RowColumn rc = entries.get(i);
Mutation m = new Mutation(rc.row);
- m.putDelete(new Text(rc.column.columnFamily), new Text(rc.column.columnQualifier), cv, rc.timestamp + 1);
+ m.putDelete(new Text(rc.column.columnFamily), new Text(rc.column.columnQualifier), new ColumnVisibility(rc.column.getColumnVisibility()), rc.timestamp + 1);
mutations.addMutation(m);
rows.remove(rc);
result++;
@@ -107,7 +95,7 @@ public class TestRandomDeletes {
mutations.close();
- Set<RowColumn> current = scanAll(t);
+ Set<RowColumn> current = scanAll(opts, t);
current.removeAll(rows);
if (current.size() > 0) {
throw new RuntimeException(current.size() + " records not deleted");
@@ -116,36 +104,21 @@ public class TestRandomDeletes {
}
static public void main(String[] args) {
- Option usernameOpt = new Option("username", "username", true, "username");
- Option passwordOpt = new Option("password", "password", true, "password");
-
- Options opts = new Options();
-
- opts.addOption(usernameOpt);
- opts.addOption(passwordOpt);
- Parser p = new BasicParser();
- CommandLine cl = null;
- try {
- cl = p.parse(opts, args);
- } catch (ParseException e1) {
- System.out.println("Parse Exception, exiting.");
- return;
- }
- credentials = new AuthInfo(cl.getOptionValue("username", "root"), ByteBuffer.wrap(cl.getOptionValue("password", "secret").getBytes()), HdfsZooInstance
- .getInstance().getInstanceID());
+ ClientOpts opts = new ClientOpts();
+ opts.parseArgs(TestRandomDeletes.class.getName(), args);
try {
long deleted = 0;
Text t = new Text("test_ingest");
- TreeSet<RowColumn> doomed = scanAll(t);
+ TreeSet<RowColumn> doomed = scanAll(opts, t);
log.info("Got " + doomed.size() + " rows");
long startTime = System.currentTimeMillis();
while (true) {
- long half = scrambleDeleteHalfAndCheck(t, doomed);
+ long half = scrambleDeleteHalfAndCheck(opts, t, doomed);
deleted += half;
if (half == 0)
break;
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java Wed Dec 19 16:25:03 2012
@@ -34,15 +34,13 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.ZooReader;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.test.TestIngest.IngestArgs;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
+import com.beust.jcommander.Parameter;
+
public class VerifyIngest {
- private static String username = "root";
- private static byte[] passwd = "secret".getBytes();
private static final Logger log = Logger.getLogger(VerifyIngest.class);
@@ -54,11 +52,17 @@ public class VerifyIngest {
return Integer.parseInt(k.getColumnQualifier().toString().split("_")[1]);
}
+ public static class Opts extends TestIngest.Opts {
+ @Parameter(names="-useGet", description="fetches values one at a time, instead of scanning")
+ boolean useGet = false;
+ }
+
public static void main(String[] args) {
- IngestArgs ingestArgs = TestIngest.parseArgs(args);
- Instance instance = HdfsZooInstance.getInstance();
+ Opts opts = new Opts();
+ opts.parseArgs(VerifyIngest.class.getName(), args);
+ Instance instance = opts.getInstance();
try {
- if (ingestArgs.trace) {
+ if (opts.trace) {
String name = VerifyIngest.class.getSimpleName();
DistributedTrace.enable(instance, new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), name, null);
Trace.on(name);
@@ -68,36 +72,36 @@ public class VerifyIngest {
Connector connector = null;
while (connector == null) {
try {
- connector = instance.getConnector(username, passwd);
+ connector = opts.getConnector();
} catch (AccumuloException e) {
log.warn("Could not connect to accumulo; will retry: " + e);
UtilWaitThread.sleep(1000);
}
}
- byte[][] bytevals = TestIngest.generateValues(ingestArgs);
+ byte[][] bytevals = TestIngest.generateValues(opts);
Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2");
- int expectedRow = ingestArgs.startRow;
+ int expectedRow = opts.startRow;
int expectedCol = 0;
int recsRead = 0;
long bytesRead = 0;
long t1 = System.currentTimeMillis();
- byte randomValue[] = new byte[ingestArgs.dataSize];
+ byte randomValue[] = new byte[opts.dataSize];
Random random = new Random();
- Key endKey = new Key(new Text("row_" + String.format("%010d", ingestArgs.rows + ingestArgs.startRow)));
+ Key endKey = new Key(new Text("row_" + String.format("%010d", opts.rows + opts.startRow)));
int errors = 0;
- while (expectedRow < (ingestArgs.rows + ingestArgs.startRow)) {
+ while (expectedRow < (opts.rows + opts.startRow)) {
- if (ingestArgs.useGet) {
- Text rowKey = new Text("row_" + String.format("%010d", expectedRow + ingestArgs.startRow));
- Text colf = new Text(ingestArgs.columnFamily);
+ if (opts.useGet) {
+ Text rowKey = new Text("row_" + String.format("%010d", expectedRow + opts.startRow));
+ Text colf = new Text(opts.columnFamily);
Text colq = new Text("col_" + String.format("%05d", expectedCol));
Scanner scanner = connector.createScanner("test_ingest", labelAuths);
@@ -115,8 +119,8 @@ public class VerifyIngest {
}
byte ev[];
- if (ingestArgs.random) {
- ev = TestIngest.genRandomValue(random, randomValue, ingestArgs.seed, expectedRow, expectedCol);
+ if (opts.random != null) {
+ ev = TestIngest.genRandomValue(random, randomValue, opts.random.intValue(), expectedRow, expectedCol);
} else {
ev = bytevals[expectedCol % bytevals.length];
}
@@ -135,22 +139,20 @@ public class VerifyIngest {
}
expectedCol++;
- if (expectedCol >= ingestArgs.cols) {
+ if (expectedCol >= opts.cols) {
expectedCol = 0;
expectedRow++;
}
} else {
- int batchSize = 10000;
-
Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow)));
Scanner scanner = connector.createScanner("test_ingest", labelAuths);
- scanner.setBatchSize(batchSize);
+ scanner.setBatchSize(opts.scanBatchSize);
scanner.setRange(new Range(startKey, endKey));
- for (int j = 0; j < ingestArgs.cols; j++) {
- scanner.fetchColumn(new Text(ingestArgs.columnFamily), new Text("col_" + String.format("%05d", j)));
+ for (int j = 0; j < opts.cols; j++) {
+ scanner.fetchColumn(new Text(opts.columnFamily), new Text("col_" + String.format("%05d", j)));
}
int recsReadBefore = recsRead;
@@ -176,16 +178,16 @@ public class VerifyIngest {
errors++;
}
- if (expectedRow >= (ingestArgs.rows + ingestArgs.startRow)) {
- log.error("expectedRow (" + expectedRow + ") >= (ingestArgs.rows + ingestArgs.startRow) (" + (ingestArgs.rows + ingestArgs.startRow)
+ if (expectedRow >= (opts.rows + opts.startRow)) {
+ log.error("expectedRow (" + expectedRow + ") >= (ingestArgs.rows + ingestArgs.startRow) (" + (opts.rows + opts.startRow)
+ "), get batch returned data passed end key");
errors++;
break;
}
byte value[];
- if (ingestArgs.random) {
- value = TestIngest.genRandomValue(random, randomValue, ingestArgs.seed, expectedRow, colNum);
+ if (opts.random != null) {
+ value = TestIngest.genRandomValue(random, randomValue, opts.random.intValue(), expectedRow, colNum);
} else {
value = bytevals[colNum % bytevals.length];
}
@@ -196,13 +198,13 @@ public class VerifyIngest {
errors++;
}
- if (ingestArgs.hasTimestamp && entry.getKey().getTimestamp() != ingestArgs.timestamp) {
+ if (opts.timestamp >= 0 && entry.getKey().getTimestamp() != opts.timestamp) {
log.error("unexpected timestamp " + entry.getKey().getTimestamp() + ", rowNum : " + rowNum + " colNum : " + colNum);
errors++;
}
expectedCol++;
- if (expectedCol >= ingestArgs.cols) {
+ if (expectedCol >= opts.cols) {
expectedCol = 0;
expectedRow++;
}
@@ -224,8 +226,8 @@ public class VerifyIngest {
System.exit(1);
}
- if (expectedRow != (ingestArgs.rows + ingestArgs.startRow)) {
- log.error("Did not read expected number of rows. Saw " + (expectedRow - ingestArgs.startRow) + " expected " + ingestArgs.rows);
+ if (expectedRow != (opts.rows + opts.startRow)) {
+ log.error("Did not read expected number of rows. Saw " + (expectedRow - opts.startRow) + " expected " + opts.rows);
System.exit(1);
} else {
System.out.printf("%,12d records read | %,8d records/sec | %,12d bytes read | %,8d bytes/sec | %6.3f secs %n", recsRead,
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java Wed Dec 19 16:25:03 2012
@@ -16,31 +16,35 @@
*/
package org.apache.accumulo.server.test;
-import java.nio.ByteBuffer;
-
import org.apache.accumulo.cloudtrace.instrument.Tracer;
+import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.hadoop.io.Text;
+import com.beust.jcommander.Parameter;
+
public class WrongTabletTest {
- private static AuthInfo rootCredentials = new AuthInfo("root", ByteBuffer.wrap("secret".getBytes()), HdfsZooInstance.getInstance().getInstanceID());
+
+ static class Opts extends ClientOpts {
+ @Parameter(names="--location", required=true)
+ String location;
+ }
public static void main(String[] args) {
- String location = args[0];
- ServerConfiguration conf = new ServerConfiguration(HdfsZooInstance.getInstance());
+ Opts opts = new Opts();
+ opts.parseArgs(WrongTabletTest.class.getName(), args);
+
+ ServerConfiguration conf = new ServerConfiguration(opts.getInstance());
try {
- TabletClientService.Iface client = ThriftUtil.getTServerClient(location, conf.getConfiguration());
+ TabletClientService.Iface client = ThriftUtil.getTServerClient(opts.location, conf.getConfiguration());
Mutation mutation = new Mutation(new Text("row_0003750001"));
- // mutation.set(new Text("colf:colq"), new Value("val".getBytes()));
mutation.putDelete(new Text("colf"), new Text("colq"));
- client.update(Tracer.traceInfo(), rootCredentials, new KeyExtent(new Text("test_ingest"), null, new Text("row_0003750000")).toThrift(), mutation.toThrift());
+ client.update(Tracer.traceInfo(), opts.getAuthInfo(), new KeyExtent(new Text("!!"), null, new Text("row_0003750000")).toThrift(), mutation.toThrift());
} catch (Exception e) {
throw new RuntimeException(e);
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousBatchWalker.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousBatchWalker.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousBatchWalker.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousBatchWalker.java Wed Dec 19 16:25:03 2012
@@ -24,95 +24,51 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.test.continuous.ContinuousWalk.RandomAuths;
import org.apache.hadoop.io.Text;
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
public class ContinuousBatchWalker {
- private static String debugLog = null;
- private static String authsFile = null;
-
- private static String[] processOptions(String[] args) {
- ArrayList<String> al = new ArrayList<String>();
-
- for (int i = 0; i < args.length; i++) {
- if (args[i].equals("--debug")) {
- debugLog = args[++i];
- } else if (args[i].equals("--auths")) {
- authsFile = args[++i];
- } else {
- al.add(args[i]);
- }
- }
-
- return al.toArray(new String[al.size()]);
+
+ static class Opts extends ContinuousWalk.Opts {
+ @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class)
+ long numToScan = 0;
}
-
+
public static void main(String[] args) throws Exception {
- args = processOptions(args);
-
- if (args.length != 10) {
- throw new IllegalArgumentException("usage : " + ContinuousBatchWalker.class.getName()
- + " [--debug <debug log>] [--auths <file>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time> <batch size> <query threads>");
- }
-
- if (debugLog != null) {
- Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME);
- logger.setLevel(Level.TRACE);
- logger.setAdditivity(false);
- logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
- }
-
- String instanceName = args[0];
- String zooKeepers = args[1];
-
- String user = args[2];
- String password = args[3];
-
- String table = args[4];
-
- long min = Long.parseLong(args[5]);
- long max = Long.parseLong(args[6]);
-
- long sleepTime = Long.parseLong(args[7]);
-
- int batchSize = Integer.parseInt(args[8]);
- int numQueryThreads = Integer.parseInt(args[9]);
+ Opts opts = new Opts();
+ opts.parseArgs(ContinuousBatchWalker.class.getName(), args);
Random r = new Random();
- RandomAuths randomAuths = new RandomAuths(authsFile);
- Authorizations auths = randomAuths.getAuths(r);
+ Authorizations auths = opts.randomAuths.getAuths(r);
- Connector conn = new ZooKeeperInstance(instanceName, zooKeepers).getConnector(user, password.getBytes());
- Scanner scanner = conn.createScanner(table, auths);
- BatchScanner bs = conn.createBatchScanner(table, auths, numQueryThreads);
+ Connector conn = opts.getConnector();
+ Scanner scanner = conn.createScanner(opts.tableName, auths);
+ scanner.setBatchSize(opts.scanBatchSize);
+
+ BatchScanner bs = conn.createBatchScanner(opts.tableName, auths, opts.scanThreads);
while (true) {
- Set<Text> batch = getBatch(scanner, min, max, batchSize, r);
-
+ Set<Text> batch = getBatch(scanner, opts.min, opts.max, opts.scanBatchSize, r);
List<Range> ranges = new ArrayList<Range>(batch.size());
for (Text row : batch) {
ranges.add(new Range(row));
}
- runBatchScan(batchSize, bs, batch, ranges);
+ runBatchScan(opts.scanBatchSize, bs, batch, ranges);
- UtilWaitThread.sleep(sleepTime);
+ UtilWaitThread.sleep(opts.sleepTime);
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java Wed Dec 19 16:25:03 2012
@@ -17,33 +17,27 @@
package org.apache.accumulo.server.test.continuous;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.accumulo.cloudtrace.instrument.CountSampler;
import org.apache.accumulo.cloudtrace.instrument.Trace;
-import org.apache.accumulo.cloudtrace.instrument.Tracer;
-import org.apache.accumulo.cloudtrace.instrument.receivers.ZooSpanClient;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.test.FastFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -54,17 +48,64 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
+
public class ContinuousIngest {
- private static String visFile = null;
- private static String debugLog = null;
+ static public class BaseOpts extends ClientOpts {
+ public class DebugConverter implements IStringConverter<String> {
+ @Override
+ public String convert(String debugLog) {
+ Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME);
+ logger.setLevel(Level.TRACE);
+ logger.setAdditivity(false);
+ try {
+ logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ return debugLog;
+ }
+ }
+
+ @Parameter(names="--min", description="lowest random row number to use")
+ long min = 0;
+
+ @Parameter(names="--max", description="maximum random row number to use")
+ long max = Long.MAX_VALUE;
+
+ @Parameter(names="--debugLog", description="file to write debugging output", converter=DebugConverter.class)
+ String debugLog = null;
+
+ @Parameter(names="--table", description="table to use")
+ String tableName="ci";
+ }
+
+ static public class Opts extends BaseOpts {
+ @Parameter(names="--num", description="the number of entries to ingest")
+ long num = Long.MAX_VALUE;
+
+ @Parameter(names="--maxColF", description="maximum column family value to use")
+ short maxColF = Short.MAX_VALUE;
+
+ @Parameter(names="--maxColQ", description="maximum column qualifier value to use")
+ short maxColQ = Short.MAX_VALUE;
+
+ @Parameter(names="--addCheckSum", description="turn on checksums")
+ boolean checksum = false;
+
+ @Parameter(names="--visibilities", description="read the visibilities to ingest with from a file")
+ String visFile = null;
+ }
+
private static final byte[] EMPTY_BYTES = new byte[0];
private static List<ColumnVisibility> visibilities;
- private static void initVisibilities() throws Exception {
- if (visFile == null) {
+ private static void initVisibilities(Opts opts) throws Exception {
+ if (opts.visFile == null) {
visibilities = Collections.singletonList(new ColumnVisibility());
return;
}
@@ -72,7 +113,7 @@ public class ContinuousIngest {
visibilities = new ArrayList<ColumnVisibility>();
FileSystem fs = FileSystem.get(new Configuration());
- BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(visFile))));
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile))));
String line;
@@ -87,78 +128,24 @@ public class ContinuousIngest {
return visibilities.get(rand.nextInt(visibilities.size()));
}
- private static String[] processOptions(String[] args) {
- ArrayList<String> al = new ArrayList<String>();
-
- for (int i = 0; i < args.length; i++) {
- if (args[i].equals("--debug")) {
- debugLog = args[++i];
- } else if (args[i].equals("--visibilities")) {
- visFile = args[++i];
- } else {
- al.add(args[i]);
- }
- }
-
- return al.toArray(new String[al.size()]);
- }
-
public static void main(String[] args) throws Exception {
- args = processOptions(args);
-
- if (args.length != 14) {
- throw new IllegalArgumentException(
- "usage : "
- + ContinuousIngest.class.getName()
- + " [--debug <debug log>] [--visibilities <file>] <instance name> <zookeepers> <user> <pass> <table> <num> <min> <max> <max colf> <max colq> <max mem> <max latency> <max threads> <enable checksum>");
- }
-
- if (debugLog != null) {
- Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME);
- logger.setLevel(Level.TRACE);
- logger.setAdditivity(false);
- logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
- }
+ Opts opts = new Opts();
+ opts.parseArgs(ContinuousIngest.class.getName(), args);
- initVisibilities();
+ initVisibilities(opts);
- String instanceName = args[0];
- String zooKeepers = args[1];
-
- String user = args[2];
- String password = args[3];
-
- String table = args[4];
-
- long num = Long.parseLong(args[5]);
- long min = Long.parseLong(args[6]);
- long max = Long.parseLong(args[7]);
- short maxColF = Short.parseShort(args[8]);
- short maxColQ = Short.parseShort(args[9]);
-
- long maxMemory = Long.parseLong(args[10]);
- long maxLatency = Integer.parseInt(args[11]);
- int maxWriteThreads = Integer.parseInt(args[12]);
-
- boolean checksum = Boolean.parseBoolean(args[13]);
-
- if (min < 0 || max < 0 || max <= min) {
+ if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) {
throw new IllegalArgumentException("bad min and max");
}
- Instance instance = new ZooKeeperInstance(instanceName, zooKeepers);
- Connector conn = instance.getConnector(user, password);
- String localhost = InetAddress.getLocalHost().getHostName();
- String path = ZooUtil.getRoot(instance) + Constants.ZTRACERS;
- Tracer.getInstance().addReceiver(new ZooSpanClient(zooKeepers, path, localhost, "cingest", 1000));
+ Connector conn = opts.getConnector();
- if (!conn.tableOperations().exists(table))
+ if (!conn.tableOperations().exists(opts.tableName))
try {
- conn.tableOperations().create(table);
+ conn.tableOperations().create(opts.tableName);
} catch (TableExistsException tee) {}
- BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig().setMaxMemory(maxMemory).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS)
- .setMaxWriteThreads(maxWriteThreads));
+ BatchWriter bw = conn.createBatchWriter(opts.tableName, opts.getBatchWriterConfig());
bw = Trace.wrapAll(bw, new CountSampler(1024));
Random r = new Random();
@@ -187,38 +174,38 @@ public class ContinuousIngest {
ColumnVisibility cv = getVisibility(r);
for (int index = 0; index < flushInterval; index++) {
- long rowLong = genLong(min, max, r);
+ long rowLong = genLong(opts.min, opts.max, r);
prevRows[index] = rowLong;
firstRows[index] = rowLong;
- int cf = r.nextInt(maxColF);
- int cq = r.nextInt(maxColQ);
+ int cf = r.nextInt(opts.maxColF);
+ int cq = r.nextInt(opts.maxColQ);
firstColFams[index] = cf;
firstColQuals[index] = cq;
- Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, checksum);
+ Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum);
count++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
- if (count >= num)
+ if (count >= opts.num)
break out;
// generate subsequent sets of nodes that link to previous set of nodes
for (int depth = 1; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
- long rowLong = genLong(min, max, r);
+ long rowLong = genLong(opts.min, opts.max, r);
byte[] prevRow = genRow(prevRows[index]);
prevRows[index] = rowLong;
- Mutation m = genMutation(rowLong, r.nextInt(maxColF), r.nextInt(maxColQ), cv, ingestInstanceId, count, prevRow, r, checksum);
+ Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum);
count++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
- if (count >= num)
+ if (count >= opts.num)
break out;
}
@@ -226,16 +213,17 @@ public class ContinuousIngest {
// point to something
for (int index = 0; index < flushInterval - 1; index++) {
Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r,
- checksum);
+ opts.checksum);
count++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
- if (count >= num)
+ if (count >= opts.num)
break out;
}
bw.close();
+ opts.stopTracing();
}
private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java Wed Dec 19 16:25:03 2012
@@ -18,18 +18,18 @@ package org.apache.accumulo.server.test.
import java.io.IOException;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.test.continuous.ContinuousIngest.BaseOpts;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
@@ -39,8 +39,11 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
/**
- * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job test ability of a map only job to read and
+ * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to read and
* write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes.
*
*/
@@ -107,38 +110,32 @@ public class ContinuousMoru extends Conf
}
}
+ static class Opts extends BaseOpts {
+ @Parameter(names="--maxColF", description="maximum column family value to use")
+ short maxColF = Short.MAX_VALUE;
+
+ @Parameter(names="--maxColQ", description="maximum column qualifier value to use")
+ short maxColQ = Short.MAX_VALUE;
+
+ @Parameter(names="--maxMappers", description="the maximum number of mappers to use", required=true, validateWith=PositiveInteger.class)
+ int maxMaps = 0;
+ }
+
@Override
public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- if (args.length != 13) {
- throw new IllegalArgumentException("Usage : " + ContinuousMoru.class.getName()
- + " <instance name> <zookeepers> <user> <pass> <table> <min> <max> <max cf> <max cq> <max mem> <max latency> <num threads> <max maps>");
- }
-
- String instance = args[0];
- String zookeepers = args[1];
- String user = args[2];
- String pass = args[3];
- String table = args[4];
- String min = args[5];
- String max = args[6];
- String max_cf = args[7];
- String max_cq = args[8];
- String maxMem = args[9];
- String maxLatency = args[10];
- String numThreads = args[11];
- String maxMaps = args[12];
+ Opts opts = new Opts();
+ opts.parseArgs(ContinuousMoru.class.getName(), args);
Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
job.setInputFormatClass(AccumuloInputFormat.class);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), table, new Authorizations());
- AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
+ opts.setAccumuloConfigs(job);
// set up ranges
try {
- AccumuloInputFormat.setRanges(job.getConfiguration(), new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
- .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps)));
+ Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.tableName, new Range(), opts.maxMaps);
+ AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
} catch (Exception e) {
throw new IOException(e);
@@ -149,20 +146,19 @@ public class ContinuousMoru extends Conf
job.setNumReduceTasks(0);
job.setOutputFormatClass(AccumuloOutputFormat.class);
- AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user, pass.getBytes(), false, table);
- AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
- AccumuloOutputFormat.setMaxLatency(job.getConfiguration(), (int) (Integer.parseInt(maxLatency) / 1000.0));
- AccumuloOutputFormat.setMaxMutationBufferSize(job.getConfiguration(), Long.parseLong(maxMem));
- AccumuloOutputFormat.setMaxWriteThreads(job.getConfiguration(), Integer.parseInt(numThreads));
+ AccumuloOutputFormat.setMaxLatency(job.getConfiguration(), (int) (opts.batchLatency / 1000.0));
+ AccumuloOutputFormat.setMaxMutationBufferSize(job.getConfiguration(), opts.batchMemory);
+ AccumuloOutputFormat.setMaxWriteThreads(job.getConfiguration(), opts.batchThreads);
Configuration conf = job.getConfiguration();
- conf.setLong(MIN, Long.parseLong(min));
- conf.setLong(MAX, Long.parseLong(max));
- conf.setInt(MAX_CF, Integer.parseInt(max_cf));
- conf.setInt(MAX_CQ, Integer.parseInt(max_cq));
+ conf.setLong(MIN, opts.min);
+ conf.setLong(MAX, opts.max);
+ conf.setInt(MAX_CF, opts.maxColF);
+ conf.setInt(MAX_CQ, opts.maxColQ);
conf.set(CI_ID, UUID.randomUUID().toString());
job.waitForCompletion(true);
+ opts.stopTracing();
return job.isSuccessful() ? 0 : 1;
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousQuery.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousQuery.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousQuery.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousQuery.java Wed Dec 19 16:25:03 2012
@@ -21,41 +21,33 @@ import java.util.Random;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.test.continuous.ContinuousIngest.BaseOpts;
import org.apache.hadoop.io.Text;
+import com.beust.jcommander.Parameter;
+
public class ContinuousQuery {
+ public static class Opts extends BaseOpts {
+ @Parameter(names="--sleep", description="the time to wait between queries", converter=TimeConverter.class)
+ long sleepTime = 100;
+ }
+
public static void main(String[] args) throws Exception {
- if (args.length != 8) {
- throw new IllegalArgumentException("usage : " + ContinuousIngest.class.getName()
- + " <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time>");
- }
-
- String instanceName = args[0];
- String zooKeepers = args[1];
-
- String user = args[2];
- String password = args[3];
+ Opts opts = new Opts();
+ opts.parseArgs(ContinuousQuery.class.getName(), args);
- String table = args[4];
-
- long min = Long.parseLong(args[5]);
- long max = Long.parseLong(args[6]);
-
- long sleepTime = Long.parseLong(args[7]);
-
- Connector conn = new ZooKeeperInstance(instanceName, zooKeepers).getConnector(user, password.getBytes());
- Scanner scanner = conn.createScanner(table, new Authorizations());
+ Connector conn = opts.getConnector();
+ Scanner scanner = conn.createScanner(opts.tableName, opts.auths);
+ scanner.setBatchSize(opts.scanBatchSize);
Random r = new Random();
while (true) {
- byte[] row = ContinuousIngest.genRow(min, max, r);
+ byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r);
int count = 0;
@@ -69,11 +61,8 @@ public class ContinuousQuery {
System.out.printf("SRQ %d %s %d %d%n", t1, new String(row), (t2 - t1), count);
- if (sleepTime > 0)
- Thread.sleep(sleepTime);
-
+ if (opts.sleepTime > 0)
+ Thread.sleep(opts.sleepTime);
}
-
}
-
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousScanner.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousScanner.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousScanner.java Wed Dec 19 16:25:03 2012
@@ -16,94 +16,46 @@
*/
package org.apache.accumulo.server.test.continuous;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Random;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.test.continuous.ContinuousWalk.RandomAuths;
import org.apache.hadoop.io.Text;
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
public class ContinuousScanner {
- private static String debugLog = null;
- private static String authsFile = null;
- private static String[] processOptions(String[] args) {
- ArrayList<String> al = new ArrayList<String>();
-
- for (int i = 0; i < args.length; i++) {
- if (args[i].equals("--debug")) {
- debugLog = args[++i];
- } else if (args[i].equals("--auths")) {
- authsFile = args[++i];
- } else {
- al.add(args[i]);
- }
- }
-
- return al.toArray(new String[al.size()]);
+ static class Opts extends ContinuousWalk.Opts {
+ @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class)
+ long numToScan = 0;
}
public static void main(String[] args) throws Exception {
-
- args = processOptions(args);
-
- if (args.length != 9) {
- throw new IllegalArgumentException("usage : " + ContinuousScanner.class.getName()
- + " [--debug <debug log>] [--auths <file>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time> <num to scan>");
- }
-
- if (debugLog != null) {
- Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME);
- logger.setLevel(Level.TRACE);
- logger.setAdditivity(false);
- logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
- }
+ Opts opts = new Opts();
+ opts.parseArgs(ContinuousScanner.class.getName(), args);
Random r = new Random();
- String instanceName = args[0];
- String zooKeepers = args[1];
-
- String user = args[2];
- String password = args[3];
-
- String table = args[4];
-
- long min = Long.parseLong(args[5]);
- long max = Long.parseLong(args[6]);
long distance = 1000000000000l;
- long sleepTime = Long.parseLong(args[7]);
-
- int numToScan = Integer.parseInt(args[8]);
-
- RandomAuths randomAuths = new RandomAuths(authsFile);
-
- Instance instance = new ZooKeeperInstance(instanceName, zooKeepers);
- Connector conn = instance.getConnector(user, password.getBytes());
- Authorizations auths = randomAuths.getAuths(r);
- Scanner scanner = conn.createScanner(table, auths);
+ Connector conn = opts.getConnector();
+ Authorizations auths = opts.randomAuths.getAuths(r);
+ Scanner scanner = conn.createScanner(opts.tableName, auths);
+ scanner.setBatchSize(opts.scanBatchSize);
- double delta = Math.min(.05, .05 / (numToScan / 1000.0));
- // System.out.println("Delta "+delta);
+ double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0));
while (true) {
- long startRow = ContinuousIngest.genLong(min, max - distance, r);
+ long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r);
byte[] scanStart = ContinuousIngest.genRow(startRow);
byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
@@ -124,13 +76,13 @@ public class ContinuousScanner {
// System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
- if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
+ if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) {
if (count == 0) {
distance = distance * 10;
if (distance < 0)
distance = 1000000000000l;
} else {
- double ratio = (double) numToScan / count;
+ double ratio = (double) opts.numToScan / count;
// move ratio closer to 1 to make change slower
ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
distance = (long) (ratio * distance);
@@ -141,8 +93,8 @@ public class ContinuousScanner {
System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart), (t2 - t1), count);
- if (sleepTime > 0)
- UtilWaitThread.sleep(sleepTime);
+ if (opts.sleepTime > 0)
+ UtilWaitThread.sleep(opts.sleepTime);
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java Wed Dec 19 16:25:03 2012
@@ -24,10 +24,10 @@ import java.util.TimerTask;
import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.server.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.data.Key;
@@ -41,7 +41,6 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Stat;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.monitor.Monitor;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.hadoop.conf.Configuration;
@@ -57,15 +56,11 @@ public class ContinuousStatsCollector {
static class StatsCollectionTask extends TimerTask {
private final String tableId;
- private ZooKeeperInstance instance;
- private String user;
- private String pass;
-
- public StatsCollectionTask(String tableName, String instanceName, String zooHosts, String user, String pass) {
- this.instance = new ZooKeeperInstance(instanceName, zooHosts);
- this.tableId = Tables.getNameToIdMap(instance).get(tableName);
- this.user = user;
- this.pass = pass;
+ private Opts opts;
+
+ public StatsCollectionTask(Opts opts) {
+ this.opts = opts;
+ this.tableId = Tables.getNameToIdMap(opts.getInstance()).get(opts.tableName);
System.out
.println("TIME TABLET_SERVERS TOTAL_ENTRIES TOTAL_INGEST TOTAL_QUERY TABLE_RECS TABLE_RECS_IN_MEM TABLE_INGEST TABLE_QUERY TABLE_TABLETS TABLE_TABLETS_ONLINE"
+ " ACCUMULO_DU ACCUMULO_DIRS ACCUMULO_FILES TABLE_DU TABLE_DIRS TABLE_FILES"
@@ -88,9 +83,10 @@ public class ContinuousStatsCollector {
}
private String getTabletStats() throws Exception {
- Connector conn = instance.getConnector(user, pass);
- Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ Connector conn = opts.getConnector();
+ Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, opts.auths);
+ scanner.setBatchSize(opts.scanBatchSize);
scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
scanner.addScanIterator(new IteratorSetting(1000, "cfc", ColumnFamilyCounter.class.getName()));
scanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
@@ -126,7 +122,7 @@ public class ContinuousStatsCollector {
MasterClientService.Iface client = null;
try {
- client = MasterClient.getConnectionWithRetry(HdfsZooInstance.getInstance());
+ client = MasterClient.getConnectionWithRetry(opts.getInstance());
MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials());
TableInfo all = new TableInfo();
@@ -170,10 +166,15 @@ public class ContinuousStatsCollector {
}
+ static class Opts extends ClientOnRequiredTable {
+ }
+
public static void main(String[] args) {
+ Opts opts = new Opts();
+ opts.parseArgs(ContinuousStatsCollector.class.getName(), args);
Timer jtimer = new Timer();
- jtimer.schedule(new StatsCollectionTask(args[0], args[1], args[2], args[3], args[4]), 0, 30000);
+ jtimer.schedule(new StatsCollectionTask(opts), 0, 30000);
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java Wed Dec 19 16:25:03 2012
@@ -23,14 +23,12 @@ import java.util.HashSet;
import java.util.Random;
import java.util.Set;
-import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.server.test.continuous.ContinuousWalk.BadChecksumException;
import org.apache.hadoop.conf.Configured;
@@ -45,6 +43,9 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
/**
* A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
*/
@@ -134,66 +135,49 @@ public class ContinuousVerify extends Co
}
}
- @Override
- public int run(String[] args) throws Exception {
+ static class Opts extends ClientOpts {
+ @Parameter(names="--table", description="table to verify")
+ String tableName = "ci";
- String auths = "";
- ArrayList<String> argsList = new ArrayList<String>();
+ @Parameter(names="--output", description="location in HDFS to store the results; must not exist", required=true)
+ String outputDir = "/tmp/continuousVerify";
- for (int i = 0; i < args.length; i++) {
- if (args[i].equals("--auths")) {
- auths = args[++i];
- } else {
- argsList.add(args[i]);
- }
- }
+ @Parameter(names="--maxMappers", description="the maximum number of mappers to use", required=true, validateWith=PositiveInteger.class)
+ int maxMaps = 0;
- args = argsList.toArray(new String[0]);
-
- if (args.length != 9) {
- throw new IllegalArgumentException("Usage : " + ContinuousVerify.class.getName()
- + " <instance name> <zookeepers> <user> <pass> <table> <output dir> <max mappers> <num reducers> <scan offline>");
- }
+ @Parameter(names="--reducers", description="the number of reducers to use", required=true, validateWith=PositiveInteger.class)
+ int reducers = 0;
- String instance = args[0];
- String zookeepers = args[1];
- String user = args[2];
- String pass = args[3];
- String table = args[4];
- String outputdir = args[5];
- String maxMaps = args[6];
- String reducers = args[7];
- boolean scanOffline = Boolean.parseBoolean(args[8]);
+ @Parameter(names="--offline", description="perform the verification directly on the files while the table is offline")
+ boolean scanOffline = false;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(this.getClass().getName(), args);
Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
- String clone = table;
+ String clone = opts.tableName;
Connector conn = null;
- if (scanOffline) {
+ if (opts.scanOffline) {
Random random = new Random();
- clone = table + "_" + String.format("%016x", Math.abs(random.nextLong()));
- ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
- conn = zki.getConnector(user, pass.getBytes());
- conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(), new HashSet<String>());
+ clone = opts.tableName + "_" + String.format("%016x", Math.abs(random.nextLong()));
+ conn = opts.getConnector();
+ conn.tableOperations().clone(opts.tableName, clone, true, new HashMap<String,String>(), new HashSet<String>());
conn.tableOperations().offline(clone);
}
job.setInputFormatClass(AccumuloInputFormat.class);
- Authorizations authorizations;
- if (auths == null || auths.trim().equals(""))
- authorizations = Constants.NO_AUTHS;
- else
- authorizations = new Authorizations(auths.split(","));
-
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
- AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
+
+ opts.setAccumuloConfigs(job);
+ AccumuloInputFormat.setScanOffline(job.getConfiguration(), opts.scanOffline);
// set up ranges
try {
- Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
- .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
+ Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.tableName, new Range(), opts.maxMaps);
AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
} catch (Exception e) {
@@ -205,20 +189,20 @@ public class ContinuousVerify extends Co
job.setMapOutputValueClass(VLongWritable.class);
job.setReducerClass(CReducer.class);
- job.setNumReduceTasks(Integer.parseInt(reducers));
+ job.setNumReduceTasks(opts.reducers);
job.setOutputFormatClass(TextOutputFormat.class);
- job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", scanOffline);
+ job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
- TextOutputFormat.setOutputPath(job, new Path(outputdir));
+ TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
job.waitForCompletion(true);
- if (scanOffline) {
+ if (opts.scanOffline) {
conn.tableOperations().delete(clone);
}
-
+ opts.stopTracing();
return job.isSuccessful() ? 0 : 1;
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousWalk.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousWalk.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousWalk.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousWalk.java Wed Dec 19 16:25:03 2012
@@ -19,7 +19,6 @@ package org.apache.accumulo.server.test.
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -29,36 +28,40 @@ import java.util.zip.CRC32;
import org.apache.accumulo.cloudtrace.instrument.Span;
import org.apache.accumulo.cloudtrace.instrument.Trace;
-import org.apache.accumulo.cloudtrace.instrument.Tracer;
-import org.apache.accumulo.cloudtrace.instrument.receivers.ZooSpanClient;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.Accumulo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
public class ContinuousWalk {
- private static String debugLog = null;
- private static String authsFile = null;
+ static public class Opts extends ContinuousQuery.Opts {
+ class RandomAuthsConverter implements IStringConverter<RandomAuths> {
+ @Override
+ public RandomAuths convert(String value) {
+ try {
+ return new RandomAuths(value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ @Parameter(names="--authsFile", description="read the authorities to use from a file")
+ RandomAuths randomAuths = new RandomAuths();
+ }
static class BadChecksumException extends RuntimeException {
-
private static final long serialVersionUID = 1L;
public BadChecksumException(String msg) {
@@ -67,25 +70,13 @@ public class ContinuousWalk {
}
- private static String[] processOptions(String[] args) {
- ArrayList<String> al = new ArrayList<String>();
-
- for (int i = 0; i < args.length; i++) {
- if (args[i].equals("--debug")) {
- debugLog = args[++i];
- } else if (args[i].equals("--auths")) {
- authsFile = args[++i];
- } else {
- al.add(args[i]);
- }
- }
-
- return al.toArray(new String[al.size()]);
- }
-
static class RandomAuths {
private List<Authorizations> auths;
+ RandomAuths() {
+ auths = Collections.singletonList(Constants.NO_AUTHS);
+ }
+
RandomAuths(String file) throws IOException {
if (file == null) {
auths = Collections.singletonList(Constants.NO_AUTHS);
@@ -96,14 +87,14 @@ public class ContinuousWalk {
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file))));
-
- String line;
-
- while ((line = in.readLine()) != null) {
- auths.add(new Authorizations(line.split(",")));
+ try {
+ String line;
+ while ((line = in.readLine()) != null) {
+ auths.add(new Authorizations(line.split(",")));
+ }
+ } finally {
+ in.close();
}
-
- in.close();
}
Authorizations getAuths(Random r) {
@@ -112,50 +103,18 @@ public class ContinuousWalk {
}
public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(ContinuousWalk.class.getName(), args);
- args = processOptions(args);
-
- if (args.length != 8) {
- throw new IllegalArgumentException("usage : " + ContinuousWalk.class.getName()
- + " [--debug <debug log>] [--auths <file>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time>");
- }
-
- if (debugLog != null) {
- Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME);
- logger.setLevel(Level.TRACE);
- logger.setAdditivity(false);
- logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
- }
-
- String instanceName = args[0];
- String zooKeepers = args[1];
-
- String user = args[2];
- String password = args[3];
-
- String table = args[4];
-
- long min = Long.parseLong(args[5]);
- long max = Long.parseLong(args[6]);
-
- long sleepTime = Long.parseLong(args[7]);
-
- Instance instance = new ZooKeeperInstance(instanceName, zooKeepers);
-
- String localhost = InetAddress.getLocalHost().getHostName();
- String path = ZooUtil.getRoot(instance) + Constants.ZTRACERS;
- Tracer.getInstance().addReceiver(new ZooSpanClient(zooKeepers, path, localhost, "cwalk", 1000));
- Accumulo.enableTracing(localhost, "ContinuousWalk");
- Connector conn = instance.getConnector(user, password.getBytes());
+ Connector conn = opts.getConnector();
Random r = new Random();
- RandomAuths randomAuths = new RandomAuths(authsFile);
ArrayList<Value> values = new ArrayList<Value>();
while (true) {
- Scanner scanner = conn.createScanner(table, randomAuths.getAuths(r));
- String row = findAStartRow(min, max, scanner, r);
+ Scanner scanner = conn.createScanner(opts.tableName, opts.randomAuths.getAuths(r));
+ String row = findAStartRow(opts.min, opts.max, scanner, r);
while (row != null) {
@@ -184,12 +143,12 @@ public class ContinuousWalk {
row = null;
}
- if (sleepTime > 0)
- Thread.sleep(sleepTime);
+ if (opts.sleepTime > 0)
+ Thread.sleep(opts.sleepTime);
}
- if (sleepTime > 0)
- Thread.sleep(sleepTime);
+ if (opts.sleepTime > 0)
+ Thread.sleep(opts.sleepTime);
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/TimeBinner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/TimeBinner.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/TimeBinner.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/TimeBinner.java Wed Dec 19 16:25:03 2012
@@ -21,9 +21,14 @@ import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
-import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.ClientOpts.TimeConverter;
+import org.apache.accumulo.core.cli.Help;
+
+import com.beust.jcommander.Parameter;
public class TimeBinner {
@@ -46,18 +51,25 @@ public class TimeBinner {
return dw;
}
+ static class Opts extends Help {
+ @Parameter(names="--period", description="period", converter=TimeConverter.class, required=true)
+ long period = 0;
+ @Parameter(names="--timeColumn", description="time column", required=true)
+ int timeColumn = 0;
+ @Parameter(names="--dataColumn", description="data column", required=true)
+ int dataColumn = 0;
+ @Parameter(names="--operation", description="one of: AVG, SUM, MIN, MAX, COUNT", required=true)
+ String operation;
+ @Parameter(names="--dateFormat", description="a SimpleDataFormat string that describes the data format")
+ String dateFormat = "MM/dd/yy-HH:mm:ss";
+ }
+
public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(TimeBinner.class.getName(), args);
- if (args.length != 5) {
- System.out.println("usage : " + TimeBinner.class.getName() + " <period (seconds)> <time column> <data column> AVG|SUM|MIN|MAX|COUNT <date format>");
- System.exit(-1);
- }
-
- long period = Long.parseLong(args[0]) * 1000;
- int timeColumn = Integer.parseInt(args[1]);
- int dataColumn = Integer.parseInt(args[2]);
- Operation operation = Operation.valueOf(args[3]);
- SimpleDateFormat sdf = new SimpleDateFormat(args[4]);
+ Operation operation = Operation.valueOf(opts.operation);
+ SimpleDateFormat sdf = new SimpleDateFormat(opts.dateFormat);
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
@@ -73,18 +85,18 @@ public class TimeBinner {
try {
String tokens[] = line.split("\\s+");
- long time = (long) Double.parseDouble(tokens[timeColumn]);
- double data = Double.parseDouble(tokens[dataColumn]);
+ long time = (long) Double.parseDouble(tokens[opts.timeColumn]);
+ double data = Double.parseDouble(tokens[opts.dataColumn]);
- time = (time / period) * period;
+ time = (time / opts.period) * opts.period;
double data_min = data;
double data_max = data;
switch (operation) {
case AMM_HACK1: {
- data_min = Double.parseDouble(tokens[dataColumn - 2]);
- data_max = Double.parseDouble(tokens[dataColumn - 1]);
+ data_min = Double.parseDouble(tokens[opts.dataColumn - 2]);
+ data_max = Double.parseDouble(tokens[opts.dataColumn - 1]);
// fall through to AMM
}
case AMM: {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java Wed Dec 19 16:25:03 2012
@@ -32,17 +32,19 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
-import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.server.cli.ClientOnDefaultTable;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;
+import com.beust.jcommander.Parameter;
+
/**
* BUGS This code does not handle the fact that these files could include log events from previous months. It therefore it assumes all dates are in the current
* month. One solution might be to skip log files that haven't been touched in the last month, but that doesn't prevent newer files that have old dates in them.
@@ -235,22 +237,18 @@ public class UndefinedAnalyzer {
}
}
+ static class Opts extends ClientOnDefaultTable {
+ @Parameter(names="--logdir", description="directory containing the log files", required=true)
+ String logDir;
+ Opts() { super("ci"); }
+ }
+
+ /**
+ * Class to analyze undefined references and accumulo logs to isolate the time/tablet where data was lost.
+ */
public static void main(String[] args) throws Exception {
-
- if (args.length != 7) {
- System.err.println("Usage : " + UndefinedAnalyzer.class.getName() + " <instance> <zoo> <user> <pass> <table> <ci log dir> <acu log dir>");
- return;
- }
-
- String instanceName = args[0];
- String zooKeepers = args[1];
-
- String user = args[2];
- String password = args[3];
-
- String table = args[4];
- String logDir = args[5];
- String acuLogDir = args[6];
+ Opts opts = new Opts();
+ opts.parseArgs(UndefinedAnalyzer.class.getName(), args);
List<UndefinedNode> undefs = new ArrayList<UndefinedNode>();
@@ -264,10 +262,9 @@ public class UndefinedAnalyzer {
undefs.add(new UndefinedNode(undef, ref));
}
- ZooKeeperInstance zki = new ZooKeeperInstance(instanceName, zooKeepers);
- Connector conn = zki.getConnector(user, password.getBytes());
- BatchScanner bscanner = conn.createBatchScanner(table, Constants.NO_AUTHS, 20);
-
+ Connector conn = opts.getConnector();
+ BatchScanner bscanner = conn.createBatchScanner(opts.getTableName(), opts.auths, opts.batchThreads);
+ bscanner.setTimeout(opts.batchTimeout, TimeUnit.MILLISECONDS);
List<Range> refs = new ArrayList<Range>();
for (UndefinedNode undefinedNode : undefs)
@@ -290,8 +287,8 @@ public class UndefinedAnalyzer {
bscanner.close();
- IngestInfo ingestInfo = new IngestInfo(logDir);
- TabletHistory tabletHistory = new TabletHistory(Tables.getTableId(zki, table), acuLogDir);
+ IngestInfo ingestInfo = new IngestInfo(opts.logDir);
+ TabletHistory tabletHistory = new TabletHistory(Tables.getTableId(conn.getInstance(), opts.getTableName()), opts.logDir);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/BatchWriterFlushTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/BatchWriterFlushTest.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/BatchWriterFlushTest.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/BatchWriterFlushTest.java Wed Dec 19 16:25:03 2012
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import java.util.Random;
import org.apache.accumulo.core.Constants;
@@ -71,7 +72,7 @@ public class BatchWriterFlushTest extend
private void runLatencyTest() throws Exception {
// should automatically flush after 3 seconds
- BatchWriter bw = getConnector().createBatchWriter("bwlt", new BatchWriterConfig());
+ BatchWriter bw = getConnector().createBatchWriter("bwlt", new BatchWriterConfig().setMaxLatency(2000, TimeUnit.MILLISECONDS));
Scanner scanner = getConnector().createScanner("bwlt", Constants.NO_AUTHS);
Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java Wed Dec 19 16:25:03 2012
@@ -17,6 +17,7 @@
package org.apache.accumulo.server.test.functional;
import java.io.File;
+import java.util.Arrays;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -42,7 +43,7 @@ public class CacheTestClean {
} else {
File[] files = reportDir.listFiles();
if (files.length != 0)
- throw new Exception("dir " + reportDir + " is not empty");
+ throw new Exception("dir " + reportDir + " is not empty: " + Arrays.asList(files));
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/FateStarvationTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/FateStarvationTest.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/FateStarvationTest.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/FateStarvationTest.java Wed Dec 19 16:25:03 2012
@@ -48,13 +48,13 @@ public class FateStarvationTest extends
public void run() throws Exception {
getConnector().tableOperations().create("test_ingest");
- getConnector().tableOperations().addSplits("test_ingest", TestIngest.CreateTable.getSplitPoints(0, 100000, 50));
+ getConnector().tableOperations().addSplits("test_ingest", TestIngest.getSplitPoints(0, 100000, 50));
TestIngest.main(new String[] {"-random", "89", "-timestamp", "7", "-size", "" + 50, "100000", "0", "1"});
getConnector().tableOperations().flush("test_ingest", null, null, true);
- List<Text> splits = new ArrayList<Text>(TestIngest.CreateTable.getSplitPoints(0, 100000, 67));
+ List<Text> splits = new ArrayList<Text>(TestIngest.getSplitPoints(0, 100000, 67));
Random rand = new Random();
for (int i = 0; i < 100; i++) {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java Wed Dec 19 16:25:03 2012
@@ -28,6 +28,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
@@ -43,37 +44,15 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
public abstract class FunctionalTest {
- private static Options opts;
- private static Option masterOpt;
- private static Option passwordOpt;
- private static Option usernameOpt;
- private static Option instanceNameOpt;
-
- static {
- usernameOpt = new Option("u", "username", true, "username");
- passwordOpt = new Option("p", "password", true, "password");
- masterOpt = new Option("m", "master", true, "master");
- instanceNameOpt = new Option("i", "instanceName", true, "instance name");
-
- opts = new Options();
-
- opts.addOption(usernameOpt);
- opts.addOption(passwordOpt);
- opts.addOption(masterOpt);
- opts.addOption(instanceNameOpt);
- }
-
+
public static Map<String,String> parseConfig(String... perTableConfigs) {
TreeMap<String,String> config = new TreeMap<String,String>();
@@ -126,19 +105,10 @@ public abstract class FunctionalTest {
}
- private String master = "";
private String username = "";
private String password = "";
private String instanceName = "";
- protected void setMaster(String master) {
- this.master = master;
- }
-
- protected String getMaster() {
- return master;
- }
-
protected void setUsername(String username) {
this.username = username;
}
@@ -270,57 +240,49 @@ public abstract class FunctionalTest {
}
+ static class Opts extends ClientOpts {
+ @Parameter(names="--classname", required=true, description="name of the class under test")
+ String classname = null;
+
+ @Parameter(names="--opt", required=true, description="the options for test")
+ String opt = null;
+ }
+
+
public static void main(String[] args) throws Exception {
- CommandLine cl = null;
- try {
- cl = new BasicParser().parse(opts, args);
- } catch (ParseException e) {
- printHelpAndExit(e.toString());
- }
-
- String master = cl.getOptionValue(masterOpt.getOpt(), "localhost");
- String username = cl.getOptionValue(usernameOpt.getOpt(), "root");
- String password = cl.getOptionValue(passwordOpt.getOpt(), "secret");
- String instanceName = cl.getOptionValue(instanceNameOpt.getOpt(), "FuncTest");
-
- String remainingArgs[] = cl.getArgs();
- if (remainingArgs.length < 2) {
- printHelpAndExit("Missing java classname to test and/or options.");
- }
- String clazz = remainingArgs[0];
- String opt = remainingArgs[1];
+ Opts opts = new Opts();
+ opts.parseArgs(FunctionalTest.class.getName(), args);
- Class<? extends FunctionalTest> testClass = AccumuloVFSClassLoader.loadClass(clazz, FunctionalTest.class);
+ Class<? extends FunctionalTest> testClass = AccumuloVFSClassLoader.loadClass(opts.classname, FunctionalTest.class);
FunctionalTest fTest = testClass.newInstance();
- fTest.setMaster(master);
- fTest.setUsername(username);
- fTest.setPassword(password);
- fTest.setInstanceName(instanceName);
+ //fTest.setMaster(master);
+ fTest.setUsername(opts.user);
+ fTest.setPassword(new String(opts.getPassword()));
+ fTest.setInstanceName(opts.instance);
- if (opt.equals("getConfig")) {
+ if (opts.opt.equals("getConfig")) {
Map<String,String> iconfig = fTest.getInitialConfig();
System.out.println("{");
for (Entry<String,String> entry : iconfig.entrySet()) {
System.out.println("'" + entry.getKey() + "':'" + entry.getValue() + "',");
}
System.out.println("}");
- } else if (opt.equals("setup")) {
+ } else if (opts.opt.equals("setup")) {
fTest.setup();
- } else if (opt.equals("run")) {
+ } else if (opts.opt.equals("run")) {
fTest.run();
- } else if (opt.equals("cleanup")) {
+ } else if (opts.opt.equals("cleanup")) {
fTest.cleanup();
} else {
- printHelpAndExit("Unknown option: " + opt);
+ printHelpAndExit("Unknown option: " + opts.opt);
}
}
static void printHelpAndExit(String message) {
System.out.println(message);
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "FunctionalTest {options} java_class [getconfig|setup|run|cleanup]", opts );
+ new JCommander(new Opts()).usage();
System.exit(1);
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/LargeRowTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/LargeRowTest.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/LargeRowTest.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/LargeRowTest.java Wed Dec 19 16:25:03 2012
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.test.TestIngest;
import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
public class LargeRowTest extends FunctionalTest {
@@ -51,7 +52,7 @@ public class LargeRowTest extends Functi
@Override
public Map<String,String> getInitialConfig() {
- return parseConfig(Property.TSERV_MAJC_DELAY + "=1");
+ return parseConfig(Property.TSERV_MAJC_DELAY + "=10ms");
}
@Override
@@ -94,8 +95,8 @@ public class LargeRowTest extends Functi
getConnector().tableOperations().setProperty(REG_TABLE_NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), "" + SPLIT_THRESH);
- UtilWaitThread.sleep(5000);
-
+ UtilWaitThread.sleep(12000);
+ Logger.getLogger(LargeRowTest.class).warn("checking splits");
checkSplits(REG_TABLE_NAME, 1, 9);
verify(REG_TABLE_NAME);
@@ -136,8 +137,8 @@ public class LargeRowTest extends Functi
// verify while table flush is running
verify(table);
- // give flush time to complete
- UtilWaitThread.sleep(4000);
+ // give split time to complete
+ getConnector().tableOperations().flush(table, null, null, true);
checkSplits(table, expectedSplits, expectedSplits);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java Wed Dec 19 16:25:03 2012
@@ -53,7 +53,7 @@ public class MaxOpenTest extends Functio
@Override
public List<TableSetup> getTablesToCreate() {
Map<String,String> config = parseConfig(Property.TABLE_MAJC_RATIO + "=10");
- TableSetup ts = new TableSetup("test_ingest", config, TestIngest.CreateTable.getSplitPoints(0, NUM_TO_INGEST, NUM_TABLETS));
+ TableSetup ts = new TableSetup("test_ingest", config, TestIngest.getSplitPoints(0, NUM_TO_INGEST, NUM_TABLETS));
return Collections.singletonList(ts);
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java?rev=1423923&r1=1423922&r2=1423923&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java Wed Dec 19 16:25:03 2012
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
+import org.apache.accumulo.core.cli.Help;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +37,8 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
+import com.beust.jcommander.Parameter;
+
/**
* Runs the functional tests via map-reduce.
*
@@ -56,7 +59,7 @@ import org.apache.log4j.Logger;
* Run the map-reduce job:
*
* <pre>
- * $ ./bin/accumulo accumulo.server.test.functional.RunTests /user/hadoop/tests /user/hadoop/results
+ * $ ./bin/accumulo accumulo.server.test.functional.RunTests --tests /user/hadoop/tests --output /user/hadoop/results
* </pre>
*
* Note that you will need to have some configuration in conf/accumulo-site.xml (to locate zookeeper). The map-reduce jobs will not use your local accumulo
@@ -70,6 +73,13 @@ public class RunTests extends Configured
private Job job = null;
+ static class Opts extends Help {
+ @Parameter(names="--tests", description="newline separated list of tests to run", required=true)
+ String testFile;
+ @Parameter(names="--output", description="destination for the results of tests in HDFS", required=true)
+ String outputPath;
+ }
+
static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
@Override
@@ -103,6 +113,8 @@ public class RunTests extends Configured
public int run(String[] args) throws Exception {
job = new Job(getConf(), JOB_NAME);
job.setJarByClass(this.getClass());
+ Opts opts = new Opts();
+ opts.parseArgs(RunTests.class.getName(), args);
// this is like 1-2 tests per mapper
Configuration conf = job.getConfiguration();
@@ -113,14 +125,14 @@ public class RunTests extends Configured
// set input
job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, new Path(args[0]));
+ TextInputFormat.setInputPaths(job, new Path(opts.testFile));
// set output
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem fs = FileSystem.get(conf);
- Path destination = new Path(args[1]);
+ Path destination = new Path(opts.outputPath);
if (fs.exists(destination)) {
- log.info("Deleting existing output directory " + args[1]);
+ log.info("Deleting existing output directory " + opts.outputPath);
fs.delete(destination, true);
}
TextOutputFormat.setOutputPath(job, destination);