You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/01/09 03:44:11 UTC
[07/66] [abbrv] accumulo git commit: ACCUMULO-3451 Format master
branch (1.7.0-SNAPSHOT)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/TestIngest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index 47033f3..76bedf2 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -57,61 +57,62 @@ import org.apache.log4j.Logger;
import com.beust.jcommander.Parameter;
-
public class TestIngest {
public static final Authorizations AUTHS = new Authorizations("L1", "L2", "G1", "GROUP2");
-
+
public static class Opts extends ClientOnDefaultTable {
-
- @Parameter(names="--createTable")
+
+ @Parameter(names = "--createTable")
public boolean createTable = false;
-
- @Parameter(names="--splits", description="the number of splits to use when creating the table")
+
+ @Parameter(names = "--splits", description = "the number of splits to use when creating the table")
public int numsplits = 1;
-
- @Parameter(names="--start", description="the starting row number")
+
+ @Parameter(names = "--start", description = "the starting row number")
public int startRow = 0;
-
- @Parameter(names="--rows", description="the number of rows to ingest")
+
+ @Parameter(names = "--rows", description = "the number of rows to ingest")
public int rows = 100000;
-
- @Parameter(names="--cols", description="the number of columns to ingest per row")
+
+ @Parameter(names = "--cols", description = "the number of columns to ingest per row")
public int cols = 1;
-
- @Parameter(names="--random", description="insert random rows and use the given number to seed the psuedo-random number generator")
+
+ @Parameter(names = "--random", description = "insert random rows and use the given number to seed the psuedo-random number generator")
public Integer random = null;
-
- @Parameter(names="--size", description="the size of the value to ingest")
+
+ @Parameter(names = "--size", description = "the size of the value to ingest")
public int dataSize = 1000;
-
- @Parameter(names="--delete", description="delete values instead of inserting them")
+
+ @Parameter(names = "--delete", description = "delete values instead of inserting them")
public boolean delete = false;
-
- @Parameter(names={"-ts", "--timestamp"}, description="timestamp to use for all values")
+
+ @Parameter(names = {"-ts", "--timestamp"}, description = "timestamp to use for all values")
public long timestamp = -1;
-
- @Parameter(names="--rfile", description="generate data into a file that can be imported")
+
+ @Parameter(names = "--rfile", description = "generate data into a file that can be imported")
public String outputFile = null;
-
- @Parameter(names="--stride", description="the difference between successive row ids")
+
+ @Parameter(names = "--stride", description = "the difference between successive row ids")
public int stride;
- @Parameter(names={"-cf","--columnFamily"}, description="place columns in this column family")
+ @Parameter(names = {"-cf", "--columnFamily"}, description = "place columns in this column family")
public String columnFamily = "colf";
- @Parameter(names={"-cv","--columnVisibility"}, description="place columns in this column family", converter=VisibilityConverter.class)
+ @Parameter(names = {"-cv", "--columnVisibility"}, description = "place columns in this column family", converter = VisibilityConverter.class)
public ColumnVisibility columnVisibility = new ColumnVisibility();
-
- public Opts() { super("test_ingest"); }
+
+ public Opts() {
+ super("test_ingest");
+ }
}
-
+
@SuppressWarnings("unused")
private static final Logger log = Logger.getLogger(TestIngest.class);
-
+
public static void createTable(Connector conn, Opts args) throws AccumuloException, AccumuloSecurityException, TableExistsException {
if (args.createTable) {
TreeSet<Text> splits = getSplitPoints(args.startRow, args.startRow + args.rows, args.numsplits);
-
+
if (!conn.tableOperations().exists(args.getTableName()))
conn.tableOperations().create(args.getTableName());
try {
@@ -122,27 +123,27 @@ public class TestIngest {
}
}
}
-
+
public static TreeSet<Text> getSplitPoints(long start, long end, long numsplits) {
long splitSize = (end - start) / numsplits;
-
+
long pos = start + splitSize;
-
+
TreeSet<Text> splits = new TreeSet<Text>();
-
+
while (pos < end) {
splits.add(new Text(String.format("row_%010d", pos)));
pos += splitSize;
}
return splits;
}
-
+
public static byte[][] generateValues(int dataSize) {
-
+
byte[][] bytevals = new byte[10][];
-
+
byte[] letters = {'1', '2', '3', '4', '5', '6', '7', '8', '9', '0'};
-
+
for (int i = 0; i < 10; i++) {
bytevals[i] = new byte[dataSize];
for (int j = 0; j < dataSize; j++)
@@ -150,46 +151,46 @@ public class TestIngest {
}
return bytevals;
}
-
+
private static byte ROW_PREFIX[] = "row_".getBytes(UTF_8);
private static byte COL_PREFIX[] = "col_".getBytes(UTF_8);
-
+
public static Text generateRow(int rowid, int startRow) {
return new Text(FastFormat.toZeroPaddedString(rowid + startRow, 10, 10, ROW_PREFIX));
}
-
+
public static byte[] genRandomValue(Random random, byte dest[], int seed, int row, int col) {
random.setSeed((row ^ seed) ^ col);
random.nextBytes(dest);
toPrintableChars(dest);
-
+
return dest;
}
-
+
public static void toPrintableChars(byte[] dest) {
// transform to printable chars
for (int i = 0; i < dest.length; i++) {
dest[i] = (byte) (((0xff & dest[i]) % 92) + ' ');
}
}
-
+
public static void main(String[] args) throws Exception {
-
+
Opts opts = new Opts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
opts.parseArgs(TestIngest.class.getName(), args, bwOpts);
String name = TestIngest.class.getSimpleName();
DistributedTrace.enable(name);
-
+
try {
opts.startTracing(name);
-
+
if (opts.debug)
Logger.getLogger(TabletServerBatchWriter.class.getName()).setLevel(Level.TRACE);
-
+
// test batch update
-
+
ingest(opts.getConnector(), opts, bwOpts);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -199,34 +200,33 @@ public class TestIngest {
}
}
- public static void ingest(Connector connector, Opts opts, BatchWriterOpts bwOpts) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
- MutationsRejectedException, TableExistsException {
+ public static void ingest(Connector connector, Opts opts, BatchWriterOpts bwOpts) throws IOException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException, MutationsRejectedException, TableExistsException {
long stopTime;
-
+
byte[][] bytevals = generateValues(opts.dataSize);
-
+
byte randomValue[] = new byte[opts.dataSize];
Random random = new Random();
-
+
long bytesWritten = 0;
createTable(connector, opts);
-
+
BatchWriter bw = null;
FileSKVWriter writer = null;
-
+
if (opts.outputFile != null) {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = FileSystem.get(conf);
- writer = FileOperations.getInstance().openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf,
- AccumuloConfiguration.getDefaultConfiguration());
+ writer = FileOperations.getInstance().openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
writer.startDefaultLocalityGroup();
} else {
bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
connector.securityOperations().changeUserAuthorizations(opts.principal, AUTHS);
}
Text labBA = new Text(opts.columnVisibility.getExpression());
-
+
long startTime = System.currentTimeMillis();
for (int i = 0; i < opts.rows; i++) {
int rowid;
@@ -235,13 +235,13 @@ public class TestIngest {
} else {
rowid = i;
}
-
+
Text row = generateRow(rowid, opts.startRow);
Mutation m = new Mutation(row);
for (int j = 0; j < opts.cols; j++) {
Text colf = new Text(opts.columnFamily);
Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX));
-
+
if (writer != null) {
Key key = new Key(row, colf, colq, labBA);
if (opts.timestamp >= 0) {
@@ -249,15 +249,15 @@ public class TestIngest {
} else {
key.setTimestamp(startTime);
}
-
+
if (opts.delete) {
key.setDeleted(true);
} else {
key.setDeleted(false);
}
-
+
bytesWritten += key.getSize();
-
+
if (opts.delete) {
writer.append(key, new Value(new byte[0]));
} else {
@@ -267,16 +267,16 @@ public class TestIngest {
} else {
value = bytevals[j % bytevals.length];
}
-
+
Value v = new Value(value);
writer.append(key, v);
bytesWritten += v.getSize();
}
-
+
} else {
Key key = new Key(row, colf, colq, labBA);
bytesWritten += key.getSize();
-
+
if (opts.delete) {
if (opts.timestamp >= 0)
m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp);
@@ -290,22 +290,22 @@ public class TestIngest {
value = bytevals[j % bytevals.length];
}
bytesWritten += value.length;
-
+
if (opts.timestamp >= 0) {
m.put(colf, colq, opts.columnVisibility, opts.timestamp, new Value(value, true));
} else {
m.put(colf, colq, opts.columnVisibility, new Value(value, true));
-
+
}
}
}
-
+
}
if (bw != null)
bw.addMutation(m);
-
+
}
-
+
if (writer != null) {
writer.close();
} else if (bw != null) {
@@ -317,22 +317,22 @@ public class TestIngest {
System.err.println("ERROR : Not authorized to write to : " + entry.getKey() + " due to " + entry.getValue());
}
}
-
+
if (e.getConstraintViolationSummaries().size() > 0) {
for (ConstraintViolationSummary cvs : e.getConstraintViolationSummaries()) {
System.err.println("ERROR : Constraint violates : " + cvs);
}
}
-
+
throw e;
}
}
-
+
stopTime = System.currentTimeMillis();
-
+
int totalValues = opts.rows * opts.cols;
double elapsed = (stopTime - startTime) / 1000.0;
-
+
System.out.printf("%,12d records written | %,8d records/sec | %,12d bytes written | %,8d bytes/sec | %6.3f secs %n", totalValues,
(int) (totalValues / elapsed), bytesWritten, (int) (bytesWritten / elapsed), elapsed);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java b/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java
index 1b553f4..ba5874d 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java
@@ -40,30 +40,30 @@ import org.apache.log4j.Logger;
public class TestRandomDeletes {
private static final Logger log = Logger.getLogger(TestRandomDeletes.class);
private static Authorizations auths = new Authorizations("L1", "L2", "G1", "GROUP2");
-
+
static private class RowColumn implements Comparable<RowColumn> {
Text row;
Column column;
long timestamp;
-
+
public RowColumn(Text row, Column column, long timestamp) {
this.row = row;
this.column = column;
this.timestamp = timestamp;
}
-
+
public int compareTo(RowColumn other) {
int result = row.compareTo(other.row);
if (result != 0)
return result;
return column.compareTo(other.column);
}
-
+
public String toString() {
return row.toString() + ":" + column.toString();
}
}
-
+
private static TreeSet<RowColumn> scanAll(ClientOnDefaultTable opts, ScannerOpts scanOpts, String tableName) throws Exception {
TreeSet<RowColumn> result = new TreeSet<RowColumn>();
Connector conn = opts.getConnector();
@@ -77,26 +77,28 @@ public class TestRandomDeletes {
}
return result;
}
-
- private static long scrambleDeleteHalfAndCheck(ClientOnDefaultTable opts, ScannerOpts scanOpts, BatchWriterOpts bwOpts, String tableName, Set<RowColumn> rows) throws Exception {
+
+ private static long scrambleDeleteHalfAndCheck(ClientOnDefaultTable opts, ScannerOpts scanOpts, BatchWriterOpts bwOpts, String tableName, Set<RowColumn> rows)
+ throws Exception {
int result = 0;
ArrayList<RowColumn> entries = new ArrayList<RowColumn>(rows);
java.util.Collections.shuffle(entries);
-
+
Connector connector = opts.getConnector();
BatchWriter mutations = connector.createBatchWriter(tableName, bwOpts.getBatchWriterConfig());
-
+
for (int i = 0; i < (entries.size() + 1) / 2; i++) {
RowColumn rc = entries.get(i);
Mutation m = new Mutation(rc.row);
- m.putDelete(new Text(rc.column.columnFamily), new Text(rc.column.columnQualifier), new ColumnVisibility(rc.column.getColumnVisibility()), rc.timestamp + 1);
+ m.putDelete(new Text(rc.column.columnFamily), new Text(rc.column.columnQualifier), new ColumnVisibility(rc.column.getColumnVisibility()),
+ rc.timestamp + 1);
mutations.addMutation(m);
rows.remove(rc);
result++;
}
-
+
mutations.close();
-
+
Set<RowColumn> current = scanAll(opts, scanOpts, tableName);
current.removeAll(rows);
if (current.size() > 0) {
@@ -104,25 +106,24 @@ public class TestRandomDeletes {
}
return result;
}
-
+
static public void main(String[] args) {
-
+
ClientOnDefaultTable opts = new ClientOnDefaultTable("test_ingest");
ScannerOpts scanOpts = new ScannerOpts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
opts.parseArgs(TestRandomDeletes.class.getName(), args, scanOpts, bwOpts);
-
+
log.info("starting random delete test");
-
try {
long deleted = 0;
-
+
String tableName = opts.getTableName();
-
+
TreeSet<RowColumn> doomed = scanAll(opts, scanOpts, tableName);
log.info("Got " + doomed.size() + " rows");
-
+
long startTime = System.currentTimeMillis();
while (true) {
long half = scrambleDeleteHalfAndCheck(opts, scanOpts, bwOpts, tableName, doomed);
@@ -131,7 +132,7 @@ public class TestRandomDeletes {
break;
}
long stopTime = System.currentTimeMillis();
-
+
long elapsed = (stopTime - startTime) / 1000;
log.info("deleted " + deleted + " values in " + elapsed + " seconds");
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
index 02eead2..8717c26 100644
--- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.trace.Trace;
-
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
index e4abed3..31e7b06 100644
--- a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
@@ -58,8 +58,8 @@ public class WrongTabletTest {
Mutation mutation = new Mutation(new Text("row_0003750001"));
mutation.putDelete(new Text("colf"), new Text("colq"));
- client.update(Tracer.traceInfo(), context.rpcCreds(), new KeyExtent(new Text("!!"), null,
- new Text("row_0003750000")).toThrift(), mutation.toThrift(), TDurability.DEFAULT);
+ client.update(Tracer.traceInfo(), context.rpcCreds(), new KeyExtent(new Text("!!"), null, new Text("row_0003750000")).toThrift(), mutation.toThrift(),
+ TDurability.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
index 1a74962..a2687bb 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
@@ -43,81 +43,81 @@ import com.beust.jcommander.validators.PositiveInteger;
public class ContinuousBatchWalker {
static class Opts extends ContinuousWalk.Opts {
- @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class)
+ @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class)
long numToScan = 0;
}
public static void main(String[] args) throws Exception {
-
+
Opts opts = new Opts();
ScannerOpts scanOpts = new ScannerOpts();
BatchScannerOpts bsOpts = new BatchScannerOpts();
opts.parseArgs(ContinuousBatchWalker.class.getName(), args, scanOpts, bsOpts);
-
+
Random r = new Random();
Authorizations auths = opts.randomAuths.getAuths(r);
Connector conn = opts.getConnector();
Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths);
scanner.setBatchSize(scanOpts.scanBatchSize);
-
+
BatchScanner bs = conn.createBatchScanner(opts.getTableName(), auths, bsOpts.scanThreads);
bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
while (true) {
Set<Text> batch = getBatch(scanner, opts.min, opts.max, scanOpts.scanBatchSize, r);
List<Range> ranges = new ArrayList<Range>(batch.size());
-
+
for (Text row : batch) {
ranges.add(new Range(row));
}
-
+
runBatchScan(scanOpts.scanBatchSize, bs, batch, ranges);
-
+
UtilWaitThread.sleep(opts.sleepTime);
}
-
+
}
-
+
/*
* private static void runSequentialScan(Scanner scanner, List<Range> ranges) { Set<Text> srowsSeen = new HashSet<Text>(); long st1 =
* System.currentTimeMillis(); int scount = 0; for (Range range : ranges) { scanner.setRange(range);
- *
+ *
* for (Entry<Key,Value> entry : scanner) { srowsSeen.add(entry.getKey().getRow()); scount++; } }
- *
- *
+ *
+ *
* long st2 = System.currentTimeMillis(); System.out.println("SRQ "+(st2 - st1)+" "+srowsSeen.size() +" "+scount); }
*/
-
+
private static void runBatchScan(int batchSize, BatchScanner bs, Set<Text> batch, List<Range> ranges) {
bs.setRanges(ranges);
-
+
Set<Text> rowsSeen = new HashSet<Text>();
-
+
int count = 0;
-
+
long t1 = System.currentTimeMillis();
-
+
for (Entry<Key,Value> entry : bs) {
ContinuousWalk.validate(entry.getKey(), entry.getValue());
-
+
rowsSeen.add(entry.getKey().getRow());
-
+
addRow(batchSize, entry.getValue());
-
+
count++;
}
bs.close();
-
+
long t2 = System.currentTimeMillis();
-
+
if (!rowsSeen.equals(batch)) {
HashSet<Text> copy1 = new HashSet<Text>(rowsSeen);
HashSet<Text> copy2 = new HashSet<Text>(batch);
-
+
copy1.removeAll(batch);
copy2.removeAll(rowsSeen);
-
+
System.out.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
System.err.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
System.err.println("Extra seen : " + copy1);
@@ -125,12 +125,12 @@ public class ContinuousBatchWalker {
} else {
System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count, (int) (rowsSeen.size() / ((t2 - t1) / 1000.0)));
}
-
+
}
-
+
private static void addRow(int batchSize, Value v) {
byte[] val = v.get();
-
+
int offset = ContinuousWalk.getPrevRowOffset(val);
if (offset > 1) {
Text prevRow = new Text();
@@ -140,19 +140,19 @@ public class ContinuousBatchWalker {
}
}
}
-
+
private static HashSet<Text> rowsToQuery = new HashSet<Text>();
-
+
private static Set<Text> getBatch(Scanner scanner, long min, long max, int batchSize, Random r) {
-
+
while (rowsToQuery.size() < batchSize) {
byte[] scanStart = ContinuousIngest.genRow(min, max, r);
scanner.setRange(new Range(new Text(scanStart), null));
-
+
int count = 0;
-
+
long t1 = System.currentTimeMillis();
-
+
Iterator<Entry<Key,Value>> iter = scanner.iterator();
while (iter.hasNext() && rowsToQuery.size() < 3 * batchSize) {
Entry<Key,Value> entry = iter.next();
@@ -160,24 +160,24 @@ public class ContinuousBatchWalker {
addRow(batchSize, entry.getValue());
count++;
}
-
+
long t2 = System.currentTimeMillis();
-
+
System.out.println("FSB " + t1 + " " + (t2 - t1) + " " + count);
-
+
UtilWaitThread.sleep(100);
}
-
+
HashSet<Text> ret = new HashSet<Text>();
-
+
Iterator<Text> iter = rowsToQuery.iterator();
-
+
for (int i = 0; i < batchSize; i++) {
ret.add(iter.next());
iter.remove();
}
-
+
return ret;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
index f54b8db..dba6ac9 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
@@ -54,9 +54,8 @@ import org.apache.log4j.PatternLayout;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
-
public class ContinuousIngest {
-
+
static public class BaseOpts extends MapReduceClientOnDefaultTable {
public class DebugConverter implements IStringConverter<String> {
@Override
@@ -72,64 +71,66 @@ public class ContinuousIngest {
return debugLog;
}
}
-
- @Parameter(names="--min", description="lowest random row number to use")
+
+ @Parameter(names = "--min", description = "lowest random row number to use")
long min = 0;
-
- @Parameter(names="--max", description="maximum random row number to use")
+
+ @Parameter(names = "--max", description = "maximum random row number to use")
long max = Long.MAX_VALUE;
-
- @Parameter(names="--debugLog", description="file to write debugging output", converter=DebugConverter.class)
+
+ @Parameter(names = "--debugLog", description = "file to write debugging output", converter = DebugConverter.class)
String debugLog = null;
- BaseOpts() { super("ci"); }
+ BaseOpts() {
+ super("ci");
+ }
}
-
+
public static class ShortConverter implements IStringConverter<Short> {
@Override
public Short convert(String value) {
return Short.valueOf(value);
}
}
-
+
static public class Opts extends BaseOpts {
- @Parameter(names="--num", description="the number of entries to ingest")
+ @Parameter(names = "--num", description = "the number of entries to ingest")
long num = Long.MAX_VALUE;
-
- @Parameter(names="--maxColF", description="maximum column family value to use", converter=ShortConverter.class)
+
+ @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class)
short maxColF = Short.MAX_VALUE;
-
- @Parameter(names="--maxColQ", description="maximum column qualifier value to use", converter=ShortConverter.class)
+
+ @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class)
short maxColQ = Short.MAX_VALUE;
-
- @Parameter(names="--addCheckSum", description="turn on checksums")
+
+ @Parameter(names = "--addCheckSum", description = "turn on checksums")
boolean checksum = false;
-
- @Parameter(names="--visibilities", description="read the visibilities to ingest with from a file")
+
+ @Parameter(names = "--visibilities", description = "read the visibilities to ingest with from a file")
String visFile = null;
}
-
+
private static final byte[] EMPTY_BYTES = new byte[0];
-
+
private static List<ColumnVisibility> visibilities;
-
+
private static void initVisibilities(Opts opts) throws Exception {
if (opts.visFile == null) {
visibilities = Collections.singletonList(new ColumnVisibility());
return;
}
-
+
visibilities = new ArrayList<ColumnVisibility>();
-
+
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8));
-
+
String line;
-
+
while ((line = in.readLine()) != null) {
visibilities.add(new ColumnVisibility(line));
}
-
+
in.close();
}
@@ -138,35 +139,35 @@ public class ContinuousIngest {
}
public static void main(String[] args) throws Exception {
-
+
Opts opts = new Opts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
opts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts);
-
+
initVisibilities(opts);
if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) {
throw new IllegalArgumentException("bad min and max");
}
Connector conn = opts.getConnector();
-
+
if (!conn.tableOperations().exists(opts.getTableName())) {
throw new TableNotFoundException(null, opts.getTableName(), "Consult the README and create the table before starting ingest.");
}
BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
bw = Trace.wrapAll(bw, new CountSampler(1024));
-
+
Random r = new Random();
-
+
byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
-
+
System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId, UTF_8));
-
+
long count = 0;
final int flushInterval = 1000000;
final int maxDepth = 25;
-
+
// always want to point back to flushed data. This way the previous item should
// always exist in accumulo when verifying data. To do this make insert N point
// back to the row from insert (N - flushInterval). The array below is used to keep
@@ -175,9 +176,9 @@ public class ContinuousIngest {
long firstRows[] = new long[flushInterval];
int firstColFams[] = new int[flushInterval];
int firstColQuals[] = new int[flushInterval];
-
+
long lastFlushTime = System.currentTimeMillis();
-
+
out: while (true) {
// generate first set of nodes
ColumnVisibility cv = getVisibility(r);
@@ -186,22 +187,22 @@ public class ContinuousIngest {
long rowLong = genLong(opts.min, opts.max, r);
prevRows[index] = rowLong;
firstRows[index] = rowLong;
-
+
int cf = r.nextInt(opts.maxColF);
int cq = r.nextInt(opts.maxColQ);
-
+
firstColFams[index] = cf;
firstColQuals[index] = cq;
-
+
Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum);
count++;
bw.addMutation(m);
}
-
+
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
if (count >= opts.num)
break out;
-
+
// generate subsequent sets of nodes that link to previous set of nodes
for (int depth = 1; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
@@ -212,12 +213,12 @@ public class ContinuousIngest {
count++;
bw.addMutation(m);
}
-
+
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
if (count >= opts.num)
break out;
}
-
+
// create one big linked list, this makes all of the first inserts
// point to something
for (int index = 0; index < flushInterval - 1; index++) {
@@ -230,7 +231,7 @@ public class ContinuousIngest {
if (count >= opts.num)
break out;
}
-
+
bw.close();
opts.stopTracing();
}
@@ -243,17 +244,17 @@ public class ContinuousIngest {
lastFlushTime = t2;
return lastFlushTime;
}
-
+
public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r,
boolean checksum) {
// Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead
CRC32 cksum = null;
-
+
byte[] rowString = genRow(rowLong);
-
+
byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES);
byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES);
-
+
if (checksum) {
cksum = new CRC32();
cksum.update(rowString);
@@ -261,25 +262,25 @@ public class ContinuousIngest {
cksum.update(cqString);
cksum.update(cv.getExpression());
}
-
+
Mutation m = new Mutation(new Text(rowString));
-
+
m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum));
return m;
}
-
+
public static final long genLong(long min, long max, Random r) {
return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min;
}
-
+
static final byte[] genRow(long min, long max, Random r) {
return genRow(genLong(min, max, r));
}
-
+
static final byte[] genRow(long rowLong) {
return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
}
-
+
private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) {
int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
if (cksum != null)
@@ -297,17 +298,17 @@ public class ContinuousIngest {
System.arraycopy(prevRow, 0, val, index, prevRow.length);
index += prevRow.length;
}
-
+
val[index++] = ':';
-
+
if (cksum != null) {
cksum.update(val, 0, index);
cksum.getValue();
FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES);
}
-
+
// System.out.println("val "+new String(val));
-
+
return new Value(val);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
index 797413f..89ff515 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
@@ -50,7 +50,7 @@ import com.beust.jcommander.validators.PositiveInteger;
/**
* A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to
* read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes.
- *
+ *
*/
public class ContinuousMoru extends Configured implements Tool {
private static final String PREFIX = ContinuousMoru.class.getSimpleName() + ".";
@@ -59,49 +59,49 @@ public class ContinuousMoru extends Configured implements Tool {
private static final String MAX = PREFIX + "MAX";
private static final String MIN = PREFIX + "MIN";
private static final String CI_ID = PREFIX + "CI_ID";
-
+
static enum Counts {
SELF_READ;
}
-
+
public static class CMapper extends Mapper<Key,Value,Text,Mutation> {
-
+
private short max_cf;
private short max_cq;
private Random random;
private String ingestInstanceId;
private byte[] iiId;
private long count;
-
+
private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
-
+
@Override
public void setup(Context context) throws IOException, InterruptedException {
int max_cf = context.getConfiguration().getInt(MAX_CF, -1);
int max_cq = context.getConfiguration().getInt(MAX_CQ, -1);
-
+
if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE)
throw new IllegalArgumentException();
-
+
this.max_cf = (short) max_cf;
this.max_cq = (short) max_cq;
-
+
random = new Random();
ingestInstanceId = context.getConfiguration().get(CI_ID);
iiId = ingestInstanceId.getBytes(UTF_8);
-
+
count = 0;
}
-
+
@Override
public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
-
+
ContinuousWalk.validate(key, data);
-
+
if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) {
// only rewrite data not written by this M/R job
byte[] val = data.get();
-
+
int offset = ContinuousWalk.getPrevRowOffset(val);
if (offset > 0) {
long rowLong = Long.parseLong(new String(val, offset, 16, UTF_8), 16);
@@ -109,24 +109,24 @@ public class ContinuousMoru extends Configured implements Tool {
.toArray(), random, true);
context.write(null, m);
}
-
+
} else {
ContinuousVerify.increment(context.getCounter(Counts.SELF_READ));
}
}
}
-
+
static class Opts extends BaseOpts {
- @Parameter(names = "--maxColF", description = "maximum column family value to use", converter=ShortConverter.class)
+ @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class)
short maxColF = Short.MAX_VALUE;
-
- @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter=ShortConverter.class)
+
+ @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class)
short maxColQ = Short.MAX_VALUE;
-
+
@Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
int maxMaps = 0;
}
-
+
@Override
public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
Opts opts = new Opts();
@@ -136,10 +136,10 @@ public class ContinuousMoru extends Configured implements Tool {
@SuppressWarnings("deprecation")
Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
-
+
job.setInputFormatClass(AccumuloInputFormat.class);
opts.setAccumuloConfigs(job);
-
+
// set up ranges
try {
Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
@@ -148,28 +148,28 @@ public class ContinuousMoru extends Configured implements Tool {
} catch (Exception e) {
throw new IOException(e);
}
-
+
job.setMapperClass(CMapper.class);
-
+
job.setNumReduceTasks(0);
-
+
job.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig());
-
+
Configuration conf = job.getConfiguration();
conf.setLong(MIN, opts.min);
conf.setLong(MAX, opts.max);
conf.setInt(MAX_CF, opts.maxColF);
conf.setInt(MAX_CQ, opts.maxColQ);
conf.set(CI_ID, UUID.randomUUID().toString());
-
+
job.waitForCompletion(true);
opts.stopTracing();
return job.isSuccessful() ? 0 : 1;
}
-
+
/**
- *
+ *
* @param args
* instanceName zookeepers username password table columns outputpath
*/
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
index dcc3e49..73048f6 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
@@ -33,28 +33,28 @@ import org.apache.hadoop.io.Text;
import com.beust.jcommander.Parameter;
public class ContinuousQuery {
-
+
public static class Opts extends BaseOpts {
- @Parameter(names="--sleep", description="the time to wait between queries", converter=TimeConverter.class)
+ @Parameter(names = "--sleep", description = "the time to wait between queries", converter = TimeConverter.class)
long sleepTime = 100;
}
-
+
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
ScannerOpts scanOpts = new ScannerOpts();
opts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts);
-
+
Connector conn = opts.getConnector();
Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.auths);
scanner.setBatchSize(scanOpts.scanBatchSize);
-
+
Random r = new Random();
-
+
while (true) {
byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r);
-
+
int count = 0;
-
+
long t1 = System.currentTimeMillis();
scanner.setRange(new Range(new Text(row)));
for (Entry<Key,Value> entry : scanner) {
@@ -62,9 +62,9 @@ public class ContinuousQuery {
count++;
}
long t2 = System.currentTimeMillis();
-
+
System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, UTF_8), (t2 - t1), count);
-
+
if (opts.sleepTime > 0)
Thread.sleep(opts.sleepTime);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
index 60154df..f68377a 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
@@ -36,50 +36,50 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.validators.PositiveInteger;
public class ContinuousScanner {
-
+
static class Opts extends ContinuousWalk.Opts {
- @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class)
+ @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class)
long numToScan = 0;
}
-
+
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
ScannerOpts scanOpts = new ScannerOpts();
opts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts);
-
+
Random r = new Random();
long distance = 1000000000000l;
-
+
Connector conn = opts.getConnector();
Authorizations auths = opts.randomAuths.getAuths(r);
Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths);
scanner.setBatchSize(scanOpts.scanBatchSize);
-
+
double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0));
-
+
while (true) {
long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r);
byte[] scanStart = ContinuousIngest.genRow(startRow);
byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
-
+
scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
-
+
int count = 0;
Iterator<Entry<Key,Value>> iter = scanner.iterator();
-
+
long t1 = System.currentTimeMillis();
-
+
while (iter.hasNext()) {
Entry<Key,Value> entry = iter.next();
ContinuousWalk.validate(entry.getKey(), entry.getValue());
count++;
}
-
+
long t2 = System.currentTimeMillis();
-
+
// System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
-
+
if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) {
if (count == 0) {
distance = distance * 10;
@@ -91,15 +91,15 @@ public class ContinuousScanner {
ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
distance = (long) (ratio * distance);
}
-
+
// System.out.println("P2 "+delta +" "+numToScan+" "+distance+" "+((double)numToScan/count ));
}
-
+
System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
-
+
if (opts.sleepTime > 0)
UtilWaitThread.sleep(opts.sleepTime);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
index 1e3b636..7c2f93b 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
@@ -58,15 +58,15 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.log4j.Logger;
public class ContinuousStatsCollector {
-
+
private static final Logger log = Logger.getLogger(ContinuousStatsCollector.class);
-
+
static class StatsCollectionTask extends TimerTask {
-
+
private final String tableId;
private final Opts opts;
private final int scanBatchSize;
-
+
public StatsCollectionTask(Opts opts, int scanBatchSize) {
this.opts = opts;
this.scanBatchSize = scanBatchSize;
@@ -76,7 +76,7 @@ public class ContinuousStatsCollector {
+ " ACCUMULO_DU ACCUMULO_DIRS ACCUMULO_FILES TABLE_DU TABLE_DIRS TABLE_FILES"
+ " MAP_TASK MAX_MAP_TASK REDUCE_TASK MAX_REDUCE_TASK TASK_TRACKERS BLACK_LISTED MIN_FILES/TABLET MAX_FILES/TABLET AVG_FILES/TABLET STDDEV_FILES/TABLET");
}
-
+
@Override
public void run() {
try {
@@ -84,37 +84,37 @@ public class ContinuousStatsCollector {
String fsStats = getFSStats();
String mrStats = getMRStats();
String tabletStats = getTabletStats();
-
+
System.out.println(System.currentTimeMillis() + " " + acuStats + " " + fsStats + " " + mrStats + " " + tabletStats);
} catch (Exception e) {
- log.error(System.currentTimeMillis()+" - Failed to collect stats", e);
+ log.error(System.currentTimeMillis() + " - Failed to collect stats", e);
}
}
-
+
private String getTabletStats() throws Exception {
-
+
Connector conn = opts.getConnector();
Scanner scanner = conn.createScanner(MetadataTable.NAME, opts.auths);
scanner.setBatchSize(scanBatchSize);
scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
scanner.addScanIterator(new IteratorSetting(1000, "cfc", ColumnFamilyCounter.class.getName()));
scanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
-
+
Stat s = new Stat();
-
+
int count = 0;
for (Entry<Key,Value> entry : scanner) {
count++;
s.addStat(Long.parseLong(entry.getValue().toString()));
}
-
+
if (count > 0)
return String.format("%d %d %.3f %.3f", s.getMin(), s.getMax(), s.getAverage(), s.getStdDev());
else
return "0 0 0 0";
-
+
}
-
+
private String getFSStats() throws Exception {
VolumeManager fs = VolumeManagerImpl.get();
long length1 = 0, dcount1 = 0, fcount1 = 0;
@@ -129,22 +129,22 @@ public class ContinuousStatsCollector {
dcount2 += contentSummary.getDirectoryCount();
fcount2 += contentSummary.getFileCount();
}
-
+
return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2;
}
-
+
private String getACUStats() throws Exception {
-
+
MasterClientService.Iface client = null;
try {
ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.principal, opts.getToken()), new ServerConfigurationFactory(
opts.getInstance()).getConfiguration());
client = MasterClient.getConnectionWithRetry(context);
MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
-
+
TableInfo all = new TableInfo();
Map<String,TableInfo> tableSummaries = new HashMap<String,TableInfo>();
-
+
for (TabletServerStatus server : stats.tServerInfo) {
for (Entry<String,TableInfo> info : server.tableMap.entrySet()) {
TableInfo tableSummary = tableSummaries.get(info.getKey());
@@ -156,42 +156,42 @@ public class ContinuousStatsCollector {
TableInfoUtil.add(all, info.getValue());
}
}
-
+
TableInfo ti = tableSummaries.get(tableId);
-
+
return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate + " " + (long) all.queryRate + " " + ti.recs + " "
+ ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate + " " + ti.tablets + " " + ti.onlineTablets;
-
+
} finally {
if (client != null)
MasterClient.close(client);
}
-
+
}
-
+
}
-
+
private static String getMRStats() throws Exception {
Configuration conf = CachedConfiguration.getInstance();
// No alternatives for hadoop 20
JobClient jc = new JobClient(new org.apache.hadoop.mapred.JobConf(conf));
-
+
ClusterStatus cs = jc.getClusterStatus(false);
-
+
return "" + cs.getMapTasks() + " " + cs.getMaxMapTasks() + " " + cs.getReduceTasks() + " " + cs.getMaxReduceTasks() + " " + cs.getTaskTrackers() + " "
+ cs.getBlacklistedTrackers();
-
+
}
-
+
static class Opts extends ClientOnRequiredTable {}
-
+
public static void main(String[] args) {
Opts opts = new Opts();
ScannerOpts scanOpts = new ScannerOpts();
opts.parseArgs(ContinuousStatsCollector.class.getName(), args, scanOpts);
Timer jtimer = new Timer();
-
+
jtimer.schedule(new StatsCollectionTask(opts, scanOpts.scanBatchSize), 0, 30000);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
index 049f9b8..461d226 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
@@ -234,7 +234,7 @@ public class ContinuousVerify extends Configured implements Tool {
}
/**
- *
+ *
* @param args
* instanceName zookeepers username password table columns outputpath
*/
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
index 9253093..60f8ec2 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
@@ -45,7 +45,7 @@ import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
public class ContinuousWalk {
-
+
static public class Opts extends ContinuousQuery.Opts {
class RandomAuthsConverter implements IStringConverter<RandomAuths> {
@Override
@@ -57,35 +57,35 @@ public class ContinuousWalk {
}
}
}
-
+
@Parameter(names = "--authsFile", description = "read the authorities to use from a file")
RandomAuths randomAuths = new RandomAuths();
}
-
+
static class BadChecksumException extends RuntimeException {
private static final long serialVersionUID = 1L;
-
+
public BadChecksumException(String msg) {
super(msg);
}
-
+
}
-
+
static class RandomAuths {
private List<Authorizations> auths;
-
+
RandomAuths() {
auths = Collections.singletonList(Authorizations.EMPTY);
}
-
+
RandomAuths(String file) throws IOException {
if (file == null) {
auths = Collections.singletonList(Authorizations.EMPTY);
return;
}
-
+
auths = new ArrayList<Authorizations>();
-
+
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), UTF_8));
try {
@@ -97,30 +97,30 @@ public class ContinuousWalk {
in.close();
}
}
-
+
Authorizations getAuths(Random r) {
return auths.get(r.nextInt(auths.size()));
}
}
-
+
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(ContinuousWalk.class.getName(), args);
-
+
Connector conn = opts.getConnector();
-
+
Random r = new Random();
-
+
ArrayList<Value> values = new ArrayList<Value>();
-
+
while (true) {
Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.randomAuths.getAuths(r));
String row = findAStartRow(opts.min, opts.max, scanner, r);
-
+
while (row != null) {
-
+
values.clear();
-
+
long t1 = System.currentTimeMillis();
Span span = Trace.on("walk");
try {
@@ -133,9 +133,9 @@ public class ContinuousWalk {
span.stop();
}
long t2 = System.currentTimeMillis();
-
+
System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size());
-
+
if (values.size() > 0) {
row = getPrevRow(values.get(r.nextInt(values.size())));
} else {
@@ -143,27 +143,27 @@ public class ContinuousWalk {
System.err.printf("MIS %d %s%n", t1, row);
row = null;
}
-
+
if (opts.sleepTime > 0)
Thread.sleep(opts.sleepTime);
}
-
+
if (opts.sleepTime > 0)
Thread.sleep(opts.sleepTime);
}
}
-
+
private static String findAStartRow(long min, long max, Scanner scanner, Random r) {
-
+
byte[] scanStart = ContinuousIngest.genRow(min, max, r);
scanner.setRange(new Range(new Text(scanStart), null));
scanner.setBatchSize(100);
-
+
int count = 0;
String pr = null;
-
+
long t1 = System.currentTimeMillis();
-
+
for (Entry<Key,Value> entry : scanner) {
validate(entry.getKey(), entry.getValue());
pr = getPrevRow(entry.getValue());
@@ -171,66 +171,66 @@ public class ContinuousWalk {
if (pr != null)
break;
}
-
+
long t2 = System.currentTimeMillis();
-
+
System.out.printf("FSR %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
-
+
return pr;
}
-
+
static int getPrevRowOffset(byte val[]) {
if (val.length == 0)
throw new IllegalArgumentException();
if (val[53] != ':')
throw new IllegalArgumentException(new String(val, UTF_8));
-
+
// prev row starts at 54
if (val[54] != ':') {
if (val[54 + 16] != ':')
throw new IllegalArgumentException(new String(val, UTF_8));
return 54;
}
-
+
return -1;
}
-
+
static String getPrevRow(Value value) {
-
+
byte[] val = value.get();
int offset = getPrevRowOffset(val);
if (offset > 0) {
return new String(val, offset, 16, UTF_8);
}
-
+
return null;
}
-
+
static int getChecksumOffset(byte val[]) {
if (val[val.length - 1] != ':') {
if (val[val.length - 9] != ':')
throw new IllegalArgumentException(new String(val, UTF_8));
return val.length - 8;
}
-
+
return -1;
}
-
+
static void validate(Key key, Value value) throws BadChecksumException {
int ckOff = getChecksumOffset(value.get());
if (ckOff < 0)
return;
-
+
long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16);
-
+
CRC32 cksum = new CRC32();
-
+
cksum.update(key.getRowData().toArray());
cksum.update(key.getColumnFamilyData().toArray());
cksum.update(key.getColumnQualifierData().toArray());
cksum.update(key.getColumnVisibilityData().toArray());
cksum.update(value.get(), 0, ckOff);
-
+
if (cksum.getValue() != storedCksum) {
throw new BadChecksumException("Checksum invalid " + key + " " + value);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java b/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java
index 1320ed5..ba39f1c 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java
@@ -23,27 +23,27 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
/**
- *
+ *
*/
public class GenSplits {
-
+
static class Opts {
@Parameter(names = "--min", description = "minimum row")
long minRow = 0;
-
+
@Parameter(names = "--max", description = "maximum row")
long maxRow = Long.MAX_VALUE;
-
+
@Parameter(description = "<num tablets>")
List<String> args = null;
}
public static void main(String[] args) {
-
+
Opts opts = new Opts();
JCommander jcommander = new JCommander(opts);
jcommander.setProgramName(GenSplits.class.getSimpleName());
-
+
try {
jcommander.parse(args);
} catch (ParameterException pe) {
@@ -56,14 +56,14 @@ public class GenSplits {
jcommander.usage();
System.exit(-1);
}
-
+
int numTablets = Integer.parseInt(opts.args.get(0));
-
+
if (numTablets < 1) {
System.err.println("ERROR: numTablets < 1");
System.exit(-1);
}
-
+
if (opts.minRow >= opts.maxRow) {
System.err.println("ERROR: min >= max");
System.exit(-1);
@@ -73,13 +73,13 @@ public class GenSplits {
long distance = ((opts.maxRow - opts.minRow) / numTablets) + 1;
long split = distance;
for (int i = 0; i < numSplits; i++) {
-
+
String s = String.format("%016x", split + opts.minRow);
-
+
while (s.charAt(s.length() - 1) == '0') {
s = s.substring(0, s.length() - 1);
}
-
+
System.out.println(s);
split += distance;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java b/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java
index b3aae46..6362afd 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java
@@ -34,15 +34,15 @@ import java.util.TreeSet;
class HistData<T> implements Comparable<HistData<T>>, Serializable {
private static final long serialVersionUID = 1L;
-
+
T bin;
long count;
-
+
HistData(T bin) {
this.bin = bin;
count = 0;
}
-
+
@SuppressWarnings("unchecked")
public int compareTo(HistData<T> o) {
return ((Comparable<T>) bin).compareTo(o.bin);
@@ -50,55 +50,55 @@ class HistData<T> implements Comparable<HistData<T>>, Serializable {
}
public class Histogram<T> implements Serializable {
-
+
private static final long serialVersionUID = 1L;
-
+
protected long sum;
protected HashMap<T,HistData<T>> counts;
-
+
public Histogram() {
sum = 0;
counts = new HashMap<T,HistData<T>>();
}
-
+
public void addPoint(T x) {
addPoint(x, 1);
}
-
+
public void addPoint(T x, long y) {
-
+
HistData<T> hd = counts.get(x);
if (hd == null) {
hd = new HistData<T>(x);
counts.put(x, hd);
}
-
+
hd.count += y;
sum += y;
}
-
+
public long getCount(T x) {
HistData<T> hd = counts.get(x);
if (hd == null)
return 0;
return hd.count;
}
-
+
public double getPercentage(T x) {
if (getSum() == 0) {
return 0;
}
return (double) getCount(x) / (double) getSum() * 100.0;
}
-
+
public long getSum() {
return sum;
}
-
+
public List<T> getKeysInCountSortedOrder() {
-
+
ArrayList<HistData<T>> sortedCounts = new ArrayList<HistData<T>>(counts.values());
-
+
Collections.sort(sortedCounts, new Comparator<HistData<T>>() {
public int compare(HistData<T> o1, HistData<T> o2) {
if (o1.count < o2.count)
@@ -108,60 +108,60 @@ public class Histogram<T> implements Serializable {
return 0;
}
});
-
+
ArrayList<T> sortedKeys = new ArrayList<T>();
-
+
for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) {
HistData<T> hd = iter.next();
sortedKeys.add(hd.bin);
}
-
+
return sortedKeys;
}
-
+
public void print(StringBuilder out) {
TreeSet<HistData<T>> sortedCounts = new TreeSet<HistData<T>>(counts.values());
-
+
int maxValueLen = 0;
-
+
for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) {
HistData<T> hd = iter.next();
if (("" + hd.bin).length() > maxValueLen) {
maxValueLen = ("" + hd.bin).length();
}
}
-
+
double psum = 0;
-
+
for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) {
HistData<T> hd = iter.next();
-
+
psum += getPercentage(hd.bin);
-
+
out.append(String.format(" %" + (maxValueLen + 1) + "s %,16d %6.2f%s %6.2f%s%n", hd.bin + "", hd.count, getPercentage(hd.bin), "%", psum, "%"));
}
out.append(String.format("%n %" + (maxValueLen + 1) + "s %,16d %n", "TOTAL", sum));
}
-
+
public void save(String file) throws IOException {
-
+
FileOutputStream fos = new FileOutputStream(file);
BufferedOutputStream bos = new BufferedOutputStream(fos);
PrintStream ps = new PrintStream(bos, false, UTF_8.name());
-
+
TreeSet<HistData<T>> sortedCounts = new TreeSet<HistData<T>>(counts.values());
for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) {
HistData<T> hd = iter.next();
ps.println(" " + hd.bin + " " + hd.count);
}
-
+
ps.close();
}
-
+
public Set<T> getKeys() {
return counts.keySet();
}
-
+
public void clear() {
counts.clear();
sum = 0;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java b/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java
index cab3126..d77f427 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java
@@ -23,38 +23,39 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+
import org.apache.log4j.Logger;
public class PrintScanTimeHistogram {
-
+
private static final Logger log = Logger.getLogger(PrintScanTimeHistogram.class);
public static void main(String[] args) throws Exception {
Histogram<String> srqHist = new Histogram<String>();
Histogram<String> fsrHist = new Histogram<String>();
-
+
processFile(System.in, srqHist, fsrHist);
-
+
StringBuilder report = new StringBuilder();
report.append(String.format("%n *** Single row queries histogram *** %n"));
srqHist.print(report);
log.info(report);
-
+
report = new StringBuilder();
report.append(String.format("%n *** Find start rows histogram *** %n"));
fsrHist.print(report);
log.info(report);
}
-
+
private static void processFile(InputStream ins, Histogram<String> srqHist, Histogram<String> fsrHist) throws FileNotFoundException, IOException {
String line;
BufferedReader in = new BufferedReader(new InputStreamReader(ins, UTF_8));
-
+
while ((line = in.readLine()) != null) {
-
+
try {
String[] tokens = line.split(" ");
-
+
String type = tokens[0];
if (type.equals("SRQ")) {
long delta = Long.parseLong(tokens[3]);
@@ -66,16 +67,16 @@ public class PrintScanTimeHistogram {
fsrHist.addPoint(point);
}
} catch (Exception e) {
- log.error("Failed to process line '"+line+"'.", e);
+ log.error("Failed to process line '" + line + "'.", e);
}
}
-
+
in.close();
}
-
+
private static String generateHistPoint(long delta) {
String point;
-
+
if (delta / 1000.0 < .1) {
point = String.format("%07.2f", delta / 1000.0);
if (point.equals("0000.10"))
@@ -89,5 +90,5 @@ public class PrintScanTimeHistogram {
}
return point;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java b/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java
index 0824948..186e8d0 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/TimeBinner.java
@@ -33,16 +33,16 @@ import org.apache.accumulo.core.cli.Help;
import com.beust.jcommander.Parameter;
public class TimeBinner {
-
+
enum Operation {
AVG, SUM, MIN, MAX, COUNT, CUMULATIVE, AMM, // avg,min,max
AMM_HACK1 // special case
}
-
+
private static class DoubleWrapper {
double d;
}
-
+
private static DoubleWrapper get(long l, HashMap<Long,DoubleWrapper> m, double init) {
DoubleWrapper dw = m.get(l);
if (dw == null) {
@@ -52,49 +52,49 @@ public class TimeBinner {
}
return dw;
}
-
+
static class Opts extends Help {
- @Parameter(names="--period", description="period", converter=TimeConverter.class, required=true)
+ @Parameter(names = "--period", description = "period", converter = TimeConverter.class, required = true)
long period = 0;
- @Parameter(names="--timeColumn", description="time column", required=true)
+ @Parameter(names = "--timeColumn", description = "time column", required = true)
int timeColumn = 0;
- @Parameter(names="--dataColumn", description="data column", required=true)
+ @Parameter(names = "--dataColumn", description = "data column", required = true)
int dataColumn = 0;
- @Parameter(names="--operation", description="one of: AVG, SUM, MIN, MAX, COUNT", required=true)
+ @Parameter(names = "--operation", description = "one of: AVG, SUM, MIN, MAX, COUNT", required = true)
String operation;
- @Parameter(names="--dateFormat", description="a SimpleDataFormat string that describes the data format")
+ @Parameter(names = "--dateFormat", description = "a SimpleDataFormat string that describes the data format")
String dateFormat = "MM/dd/yy-HH:mm:ss";
}
-
+
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(TimeBinner.class.getName(), args);
-
+
Operation operation = Operation.valueOf(opts.operation);
SimpleDateFormat sdf = new SimpleDateFormat(opts.dateFormat);
-
+
BufferedReader in = new BufferedReader(new InputStreamReader(System.in, UTF_8));
-
+
String line = null;
-
+
HashMap<Long,DoubleWrapper> aggregation1 = new HashMap<Long,DoubleWrapper>();
HashMap<Long,DoubleWrapper> aggregation2 = new HashMap<Long,DoubleWrapper>();
HashMap<Long,DoubleWrapper> aggregation3 = new HashMap<Long,DoubleWrapper>();
HashMap<Long,DoubleWrapper> aggregation4 = new HashMap<Long,DoubleWrapper>();
-
+
while ((line = in.readLine()) != null) {
-
+
try {
String tokens[] = line.split("\\s+");
-
+
long time = (long) Double.parseDouble(tokens[opts.timeColumn]);
double data = Double.parseDouble(tokens[opts.dataColumn]);
-
+
time = (time / opts.period) * opts.period;
-
+
double data_min = data;
double data_max = data;
-
+
switch (operation) {
case AMM_HACK1: {
data_min = Double.parseDouble(tokens[opts.dataColumn - 2]);
@@ -105,20 +105,20 @@ public class TimeBinner {
DoubleWrapper mindw = get(time, aggregation3, Double.POSITIVE_INFINITY);
if (data < mindw.d)
mindw.d = data_min;
-
+
DoubleWrapper maxdw = get(time, aggregation4, Double.NEGATIVE_INFINITY);
if (data > maxdw.d)
maxdw.d = data_max;
-
+
// fall through to AVG
}
case AVG: {
DoubleWrapper sumdw = get(time, aggregation1, 0);
DoubleWrapper countdw = get(time, aggregation2, 0);
-
+
sumdw.d += data;
countdw.d++;
-
+
break;
}
case MAX: {
@@ -147,20 +147,20 @@ public class TimeBinner {
break;
}
}
-
+
} catch (Exception e) {
System.err.println("Failed to process line : " + line + " " + e.getMessage());
}
}
-
+
TreeMap<Long,DoubleWrapper> sorted = new TreeMap<Long,DoubleWrapper>(aggregation1);
-
+
Set<Entry<Long,DoubleWrapper>> es = sorted.entrySet();
-
+
double cumulative = 0;
for (Entry<Long,DoubleWrapper> entry : es) {
String value;
-
+
switch (operation) {
case AMM_HACK1:
case AMM: {
@@ -181,9 +181,9 @@ public class TimeBinner {
default:
value = "" + entry.getValue().d;
}
-
+
System.out.println(sdf.format(new Date(entry.getKey())) + " " + value);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java b/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
index dffd6c3..7d2c65b 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
@@ -51,7 +51,7 @@ import com.beust.jcommander.Parameter;
/**
* BUGS This code does not handle the fact that these files could include log events from previous months. It therefore it assumes all dates are in the current
* month. One solution might be to skip log files that haven't been touched in the last month, but that doesn't prevent newer files that have old dates in them.
- *
+ *
*/
public class UndefinedAnalyzer {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java b/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java
index b7ee6fc..c589137 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadCombiner.java
@@ -23,10 +23,10 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.Combiner;
public class BadCombiner extends Combiner {
-
+
@Override
public Value reduce(Key key, Iterator<Value> iter) {
throw new IllegalStateException();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
index 1c62720..5d13d1d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
@@ -23,17 +23,17 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
public class BadIterator extends WrappingIterator {
-
+
@Override
public Key getTopKey() {
throw new NullPointerException();
}
-
+
@Override
public boolean hasTop() {
throw new NullPointerException();
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
index d112b5b..62afb32 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
@@ -24,17 +24,17 @@ import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.commons.io.FileUtils;
public class CacheTestClean {
-
+
public static void main(String[] args) throws Exception {
String rootDir = args[0];
File reportDir = new File(args[1]);
-
+
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-
+
if (zoo.exists(rootDir)) {
zoo.recursiveDelete(rootDir, NodeMissingPolicy.FAIL);
}
-
+
if (reportDir.exists()) {
FileUtils.deleteDirectory(reportDir);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java
index fdc704d..82eef6c 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java
@@ -35,48 +35,48 @@ public class CacheTestReader {
String reportDir = args[1];
String keepers = args[2];
int numData = CacheTestWriter.NUM_DATA;
-
+
File myfile = new File(reportDir + "/" + UUID.randomUUID());
myfile.deleteOnExit();
-
+
ZooCache zc = new ZooCache(keepers, 30000);
-
+
while (true) {
if (myfile.exists())
myfile.delete();
-
+
if (zc.get(rootDir + "/die") != null) {
return;
}
-
+
Map<String,String> readData = new TreeMap<String,String>();
-
+
for (int i = 0; i < numData; i++) {
byte[] v = zc.get(rootDir + "/data" + i);
if (v != null)
readData.put(rootDir + "/data" + i, new String(v, UTF_8));
}
-
+
byte[] v = zc.get(rootDir + "/dataS");
if (v != null)
readData.put(rootDir + "/dataS", new String(v, UTF_8));
-
+
List<String> children = zc.getChildren(rootDir + "/dir");
if (children != null)
for (String child : children) {
readData.put(rootDir + "/dir/" + child, "");
}
-
+
FileOutputStream fos = new FileOutputStream(myfile);
ObjectOutputStream oos = new ObjectOutputStream(fos);
-
+
oos.writeObject(readData);
-
+
fos.close();
oos.close();
-
+
UtilWaitThread.sleep(20);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java
index e1be8e6..3a3baf0 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java
@@ -35,40 +35,40 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
public class CacheTestWriter {
-
+
static final int NUM_DATA = 3;
-
+
public static void main(String[] args) throws Exception {
IZooReaderWriter zk = ZooReaderWriter.getInstance();
-
+
String rootDir = args[0];
File reportDir = new File(args[1]);
int numReaders = Integer.parseInt(args[2]);
int numVerifications = Integer.parseInt(args[3]);
int numData = NUM_DATA;
-
+
boolean dataSExists = false;
int count = 0;
-
+
zk.putPersistentData(rootDir, new byte[0], NodeExistsPolicy.FAIL);
for (int i = 0; i < numData; i++) {
zk.putPersistentData(rootDir + "/data" + i, new byte[0], NodeExistsPolicy.FAIL);
}
-
+
zk.putPersistentData(rootDir + "/dir", new byte[0], NodeExistsPolicy.FAIL);
-
+
ArrayList<String> children = new ArrayList<String>();
-
+
Random r = new Random();
-
+
while (count++ < numVerifications) {
-
+
Map<String,String> expectedData = null;
// change children in dir
-
+
for (int u = 0; u < r.nextInt(4) + 1; u++) {
expectedData = new TreeMap<String,String>();
-
+
if (r.nextFloat() < .5) {
String child = UUID.randomUUID().toString();
zk.putPersistentData(rootDir + "/dir/" + child, new byte[0], NodeExistsPolicy.SKIP);
@@ -78,32 +78,32 @@ public class CacheTestWriter {
String child = children.remove(index);
zk.recursiveDelete(rootDir + "/dir/" + child, NodeMissingPolicy.FAIL);
}
-
+
for (String child : children) {
expectedData.put(rootDir + "/dir/" + child, "");
}
-
+
// change values
for (int i = 0; i < numData; i++) {
byte data[] = Long.toString(r.nextLong(), 16).getBytes(UTF_8);
zk.putPersistentData(rootDir + "/data" + i, data, NodeExistsPolicy.OVERWRITE);
expectedData.put(rootDir + "/data" + i, new String(data, UTF_8));
}
-
+
// test a data node that does not always exists...
if (r.nextFloat() < .5) {
-
+
byte data[] = Long.toString(r.nextLong(), 16).getBytes(UTF_8);
-
+
if (!dataSExists) {
zk.putPersistentData(rootDir + "/dataS", data, NodeExistsPolicy.SKIP);
dataSExists = true;
} else {
zk.putPersistentData(rootDir + "/dataS", data, NodeExistsPolicy.OVERWRITE);
}
-
+
expectedData.put(rootDir + "/dataS", new String(data, UTF_8));
-
+
} else {
if (dataSExists) {
zk.recursiveDelete(rootDir + "/dataS", NodeMissingPolicy.FAIL);
@@ -111,34 +111,34 @@ public class CacheTestWriter {
}
}
}
-
+
// change children in dir and change values
-
+
System.out.println("expectedData " + expectedData);
-
+
// wait for all readers to see changes
while (true) {
-
+
File[] files = reportDir.listFiles();
-
+
System.out.println("files.length " + files.length);
-
+
if (files.length == numReaders) {
boolean ok = true;
-
+
for (int i = 0; i < files.length; i++) {
try {
FileInputStream fis = new FileInputStream(files[i]);
ObjectInputStream ois = new ObjectInputStream(fis);
-
+
@SuppressWarnings("unchecked")
Map<String,String> readerMap = (Map<String,String>) ois.readObject();
-
+
fis.close();
ois.close();
-
+
System.out.println("read " + readerMap);
-
+
if (!readerMap.equals(expectedData)) {
System.out.println("maps not equals");
ok = false;
@@ -148,16 +148,16 @@ public class CacheTestWriter {
ok = false;
}
}
-
+
if (ok)
break;
}
-
+
UtilWaitThread.sleep(5);
}
}
-
+
zk.putPersistentData(rootDir + "/die", new byte[0], NodeExistsPolicy.FAIL);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java b/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java
index 20fe856..32a178d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/DropModIter.java
@@ -26,26 +26,26 @@ import org.apache.accumulo.core.iterators.SkippingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
public class DropModIter extends SkippingIterator {
-
+
private int mod;
private int drop;
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
this.mod = Integer.parseInt(options.get("mod"));
this.drop = Integer.parseInt(options.get("drop"));
}
-
+
protected void consume() throws IOException {
while (getSource().hasTop() && Integer.parseInt(getSource().getTopKey().getRow().toString()) % mod == drop) {
getSource().next();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java
index 8e29955..187da35 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java
@@ -23,19 +23,19 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.util.UtilWaitThread;
/**
- *
+ *
*/
public class SlowConstraint implements Constraint {
-
+
@Override
public String getViolationDescription(short violationCode) {
return null;
}
-
+
@Override
public List<Short> check(Environment env, Mutation mutation) {
UtilWaitThread.sleep(20000);
return null;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index cb29688..f84a4d9 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@ -37,9 +37,9 @@ public class SlowIterator extends WrappingIterator {
private long sleepTime = 0;
private long seekSleepTime = 0;
-
+
public static void setSleepTime(IteratorSetting is, long millis) {
- is.addOption(SLEEP_TIME, Long.toString(millis));
+ is.addOption(SLEEP_TIME, Long.toString(millis));
}
public static void setSeekSleepTime(IteratorSetting is, long t) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index eb84533..3bb44ff 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -102,8 +102,8 @@ public class ZombieTServer {
TransactionWatcher watcher = new TransactionWatcher();
final ThriftClientHandler tch = new ThriftClientHandler(context, watcher);
Processor<Iface> processor = new Processor<Iface>(tch);
- ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", "walking dead", 2, 1, 1000,
- 10 * 1024 * 1024, null, -1);
+ ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer",
+ "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, -1);
String addressString = serverPort.address.toString();
String zPath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addressString;