You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/03/07 21:53:24 UTC
[10/50] [abbrv] git commit: Merge branch '1.4.5-SNAPSHOT' into
1.5.2-SNAPSHOT
Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bd283aec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bd283aec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bd283aec
Branch: refs/heads/ACCUMULO-2061
Commit: bd283aec0aa10d4c9deba8b5df097d48a424780b
Parents: bee78fa 759582b
Author: Mike Drob <md...@cloudera.com>
Authored: Tue Mar 4 09:15:16 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 09:15:16 2014 -0500
----------------------------------------------------------------------
.../test/continuous/ContinuousBatchWalker.java | 2 +-
.../test/continuous/ContinuousIngest.java | 9 ++--
.../test/continuous/ContinuousQuery.java | 2 +-
.../test/continuous/ContinuousScanner.java | 2 +-
.../test/continuous/ContinuousUtil.java | 49 ++++++++++++++++++++
.../test/continuous/ContinuousWalk.java | 6 +--
6 files changed, 59 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
index d021164,0000000..3304d24
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
@@@ -1,182 -1,0 +1,182 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.cli.BatchScannerOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+public class ContinuousBatchWalker {
+
+ static class Opts extends ContinuousWalk.Opts {
+ @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class)
+ long numToScan = 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ Opts opts = new Opts();
+ ScannerOpts scanOpts = new ScannerOpts();
+ BatchScannerOpts bsOpts = new BatchScannerOpts();
+ opts.parseArgs(ContinuousBatchWalker.class.getName(), args, scanOpts, bsOpts);
+
+ Random r = new Random();
+ Authorizations auths = opts.randomAuths.getAuths(r);
+
+ Connector conn = opts.getConnector();
- Scanner scanner = conn.createScanner(opts.getTableName(), auths);
++ Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths);
+ scanner.setBatchSize(scanOpts.scanBatchSize);
+
+ BatchScanner bs = conn.createBatchScanner(opts.getTableName(), auths, bsOpts.scanThreads);
+ bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
+
+ while (true) {
+ Set<Text> batch = getBatch(scanner, opts.min, opts.max, scanOpts.scanBatchSize, r);
+ List<Range> ranges = new ArrayList<Range>(batch.size());
+
+ for (Text row : batch) {
+ ranges.add(new Range(row));
+ }
+
+ runBatchScan(scanOpts.scanBatchSize, bs, batch, ranges);
+
+ UtilWaitThread.sleep(opts.sleepTime);
+ }
+
+ }
+
+ /*
+ * private static void runSequentialScan(Scanner scanner, List<Range> ranges) { Set<Text> srowsSeen = new HashSet<Text>(); long st1 =
+ * System.currentTimeMillis(); int scount = 0; for (Range range : ranges) { scanner.setRange(range);
+ *
+ * for (Entry<Key,Value> entry : scanner) { srowsSeen.add(entry.getKey().getRow()); scount++; } }
+ *
+ *
+ * long st2 = System.currentTimeMillis(); System.out.println("SRQ "+(st2 - st1)+" "+srowsSeen.size() +" "+scount); }
+ */
+
+ private static void runBatchScan(int batchSize, BatchScanner bs, Set<Text> batch, List<Range> ranges) {
+ bs.setRanges(ranges);
+
+ Set<Text> rowsSeen = new HashSet<Text>();
+
+ int count = 0;
+
+ long t1 = System.currentTimeMillis();
+
+ for (Entry<Key,Value> entry : bs) {
+ ContinuousWalk.validate(entry.getKey(), entry.getValue());
+
+ rowsSeen.add(entry.getKey().getRow());
+
+ addRow(batchSize, entry.getValue());
+
+ count++;
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ if (!rowsSeen.equals(batch)) {
+ HashSet<Text> copy1 = new HashSet<Text>(rowsSeen);
+ HashSet<Text> copy2 = new HashSet<Text>(batch);
+
+ copy1.removeAll(batch);
+ copy2.removeAll(rowsSeen);
+
+ System.out.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
+ System.err.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
+ System.err.println("Extra seen : " + copy1);
+ System.err.println("Not seen : " + copy2);
+ } else {
+ System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count, (int) (rowsSeen.size() / ((t2 - t1) / 1000.0)));
+ }
+
+ }
+
+ private static void addRow(int batchSize, Value v) {
+ byte[] val = v.get();
+
+ int offset = ContinuousWalk.getPrevRowOffset(val);
+ if (offset > 1) {
+ Text prevRow = new Text();
+ prevRow.set(val, offset, 16);
+ if (rowsToQuery.size() < 3 * batchSize) {
+ rowsToQuery.add(prevRow);
+ }
+ }
+ }
+
+ private static HashSet<Text> rowsToQuery = new HashSet<Text>();
+
+ private static Set<Text> getBatch(Scanner scanner, long min, long max, int batchSize, Random r) {
+
+ while (rowsToQuery.size() < batchSize) {
+ byte[] scanStart = ContinuousIngest.genRow(min, max, r);
+ scanner.setRange(new Range(new Text(scanStart), null));
+
+ int count = 0;
+
+ long t1 = System.currentTimeMillis();
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ while (iter.hasNext() && rowsToQuery.size() < 3 * batchSize) {
+ Entry<Key,Value> entry = iter.next();
+ ContinuousWalk.validate(entry.getKey(), entry.getValue());
+ addRow(batchSize, entry.getValue());
+ count++;
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ System.out.println("FSB " + t1 + " " + (t2 - t1) + " " + count);
+
+ UtilWaitThread.sleep(100);
+ }
+
+ HashSet<Text> ret = new HashSet<Text>();
+
+ Iterator<Text> iter = rowsToQuery.iterator();
+
+ for (int i = 0; i < batchSize; i++) {
+ ret.add(iter.next());
+ iter.remove();
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
index 23cf15d,0000000..e3f0485
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
@@@ -1,312 -1,0 +1,311 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
- import org.apache.accumulo.core.client.TableExistsException;
++import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.trace.instrument.CountSampler;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
+
+
+public class ContinuousIngest {
+
+ static public class BaseOpts extends ClientOnDefaultTable {
+ public class DebugConverter implements IStringConverter<String> {
+ @Override
+ public String convert(String debugLog) {
+ Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME);
+ logger.setLevel(Level.TRACE);
+ logger.setAdditivity(false);
+ try {
+ logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ return debugLog;
+ }
+ }
+
+ @Parameter(names="--min", description="lowest random row number to use")
+ long min = 0;
+
+ @Parameter(names="--max", description="maximum random row number to use")
+ long max = Long.MAX_VALUE;
+
+ @Parameter(names="--debugLog", description="file to write debugging output", converter=DebugConverter.class)
+ String debugLog = null;
+
+ BaseOpts() { super("ci"); }
+ }
+
+ public static class ShortConverter implements IStringConverter<Short> {
+ @Override
+ public Short convert(String value) {
+ return Short.valueOf(value);
+ }
+ }
+
+ static public class Opts extends BaseOpts {
+ @Parameter(names="--num", description="the number of entries to ingest")
+ long num = Long.MAX_VALUE;
+
+ @Parameter(names="--maxColF", description="maximum column family value to use", converter=ShortConverter.class)
+ short maxColF = Short.MAX_VALUE;
+
+ @Parameter(names="--maxColQ", description="maximum column qualifier value to use", converter=ShortConverter.class)
+ short maxColQ = Short.MAX_VALUE;
+
+ @Parameter(names="--addCheckSum", description="turn on checksums")
+ boolean checksum = false;
+
+ @Parameter(names="--visibilities", description="read the visibilities to ingest with from a file")
+ String visFile = null;
+ }
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
+ private static List<ColumnVisibility> visibilities;
+
+ private static void initVisibilities(Opts opts) throws Exception {
+ if (opts.visFile == null) {
+ visibilities = Collections.singletonList(new ColumnVisibility());
+ return;
+ }
+
+ visibilities = new ArrayList<ColumnVisibility>();
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), Constants.UTF8));
+
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ visibilities.add(new ColumnVisibility(line));
+ }
+
+ in.close();
+ }
+
+ private static ColumnVisibility getVisibility(Random rand) {
+ return visibilities.get(rand.nextInt(visibilities.size()));
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ Opts opts = new Opts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts);
+
+ initVisibilities(opts);
+
+ if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) {
+ throw new IllegalArgumentException("bad min and max");
+ }
+ Connector conn = opts.getConnector();
+
- if (!conn.tableOperations().exists(opts.getTableName()))
- try {
- conn.tableOperations().create(opts.getTableName());
- } catch (TableExistsException tee) {}
++ if (!conn.tableOperations().exists(opts.getTableName())) {
++ throw new TableNotFoundException(null, opts.getTableName(), "Consult the README and create the table before starting ingest.");
++ }
+
+ BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
+ bw = Trace.wrapAll(bw, new CountSampler(1024));
+
+ Random r = new Random();
+
+ byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(Constants.UTF8);
+
+ System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId, Constants.UTF8));
+
+ long count = 0;
+ final int flushInterval = 1000000;
+ final int maxDepth = 25;
+
+ // always want to point back to flushed data. This way the previous item should
+ // always exist in accumulo when verifying data. To do this make insert N point
+ // back to the row from insert (N - flushInterval). The array below is used to keep
+ // track of this.
+ long prevRows[] = new long[flushInterval];
+ long firstRows[] = new long[flushInterval];
+ int firstColFams[] = new int[flushInterval];
+ int firstColQuals[] = new int[flushInterval];
+
+ long lastFlushTime = System.currentTimeMillis();
+
+ out: while (true) {
+ // generate first set of nodes
+ ColumnVisibility cv = getVisibility(r);
+
+ for (int index = 0; index < flushInterval; index++) {
+ long rowLong = genLong(opts.min, opts.max, r);
+ prevRows[index] = rowLong;
+ firstRows[index] = rowLong;
+
+ int cf = r.nextInt(opts.maxColF);
+ int cq = r.nextInt(opts.maxColQ);
+
+ firstColFams[index] = cf;
+ firstColQuals[index] = cq;
+
+ Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum);
+ count++;
+ bw.addMutation(m);
+ }
+
+ lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+ if (count >= opts.num)
+ break out;
+
+ // generate subsequent sets of nodes that link to previous set of nodes
+ for (int depth = 1; depth < maxDepth; depth++) {
+ for (int index = 0; index < flushInterval; index++) {
+ long rowLong = genLong(opts.min, opts.max, r);
+ byte[] prevRow = genRow(prevRows[index]);
+ prevRows[index] = rowLong;
+ Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum);
+ count++;
+ bw.addMutation(m);
+ }
+
+ lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+ if (count >= opts.num)
+ break out;
+ }
+
+ // create one big linked list, this makes all of the first inserts
+ // point to something
+ for (int index = 0; index < flushInterval - 1; index++) {
+ Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r,
+ opts.checksum);
+ count++;
+ bw.addMutation(m);
+ }
+ lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+ if (count >= opts.num)
+ break out;
+ }
+
+ bw.close();
+ opts.stopTracing();
+ }
+
+ private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException {
+ long t1 = System.currentTimeMillis();
+ bw.flush();
+ long t2 = System.currentTimeMillis();
+ System.out.printf("FLUSH %d %d %d %d %d%n", t2, (t2 - lastFlushTime), (t2 - t1), count, flushInterval);
+ lastFlushTime = t2;
+ return lastFlushTime;
+ }
+
+ public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r,
+ boolean checksum) {
+ // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead
+ CRC32 cksum = null;
+
+ byte[] rowString = genRow(rowLong);
+
+ byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES);
+ byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES);
+
+ if (checksum) {
+ cksum = new CRC32();
+ cksum.update(rowString);
+ cksum.update(cfString);
+ cksum.update(cqString);
+ cksum.update(cv.getExpression());
+ }
+
+ Mutation m = new Mutation(new Text(rowString));
+
+ m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum));
+ return m;
+ }
+
+ public static final long genLong(long min, long max, Random r) {
+ return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min;
+ }
+
+ static final byte[] genRow(long min, long max, Random r) {
+ return genRow(genLong(min, max, r));
+ }
+
+ static final byte[] genRow(long rowLong) {
+ return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
+ }
+
+ private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) {
+ int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
+ if (cksum != null)
+ dataLen += 8;
+ byte val[] = new byte[dataLen];
+ System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
+ int index = ingestInstanceId.length;
+ val[index++] = ':';
+ int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, EMPTY_BYTES);
+ if (added != 16)
+ throw new RuntimeException(" " + added);
+ index += 16;
+ val[index++] = ':';
+ if (prevRow != null) {
+ System.arraycopy(prevRow, 0, val, index, prevRow.length);
+ index += prevRow.length;
+ }
+
+ val[index++] = ':';
+
+ if (cksum != null) {
+ cksum.update(val, 0, index);
+ cksum.getValue();
+ FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES);
+ }
+
+ // System.out.println("val "+new String(val));
+
+ return new Value(val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
index 117c136,0000000..4bbc85f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
@@@ -1,71 -1,0 +1,71 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.test.continuous.ContinuousIngest.BaseOpts;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+public class ContinuousQuery {
+
+ public static class Opts extends BaseOpts {
+ @Parameter(names="--sleep", description="the time to wait between queries", converter=TimeConverter.class)
+ long sleepTime = 100;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ ScannerOpts scanOpts = new ScannerOpts();
+ opts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts);
+
+ Connector conn = opts.getConnector();
- Scanner scanner = conn.createScanner(opts.getTableName(), opts.auths);
++ Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.auths);
+ scanner.setBatchSize(scanOpts.scanBatchSize);
+
+ Random r = new Random();
+
+ while (true) {
+ byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r);
+
+ int count = 0;
+
+ long t1 = System.currentTimeMillis();
+ scanner.setRange(new Range(new Text(row)));
+ for (Entry<Key,Value> entry : scanner) {
+ ContinuousWalk.validate(entry.getKey(), entry.getValue());
+ count++;
+ }
+ long t2 = System.currentTimeMillis();
+
+ System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, Constants.UTF8), (t2 - t1), count);
+
+ if (opts.sleepTime > 0)
+ Thread.sleep(opts.sleepTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
index c331bab,0000000..fcc8fec
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
@@@ -1,104 -1,0 +1,104 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+public class ContinuousScanner {
+
+ static class Opts extends ContinuousWalk.Opts {
+ @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class)
+ long numToScan = 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ ScannerOpts scanOpts = new ScannerOpts();
+ opts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts);
+
+ Random r = new Random();
+
+ long distance = 1000000000000l;
+
+ Connector conn = opts.getConnector();
+ Authorizations auths = opts.randomAuths.getAuths(r);
- Scanner scanner = conn.createScanner(opts.getTableName(), auths);
++ Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths);
+ scanner.setBatchSize(scanOpts.scanBatchSize);
+
+ double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0));
+
+ while (true) {
+ long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r);
+ byte[] scanStart = ContinuousIngest.genRow(startRow);
+ byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
+
+ scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
+
+ int count = 0;
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+ long t1 = System.currentTimeMillis();
+
+ while (iter.hasNext()) {
+ Entry<Key,Value> entry = iter.next();
+ ContinuousWalk.validate(entry.getKey(), entry.getValue());
+ count++;
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ // System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
+
+ if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) {
+ if (count == 0) {
+ distance = distance * 10;
+ if (distance < 0)
+ distance = 1000000000000l;
+ } else {
+ double ratio = (double) opts.numToScan / count;
+ // move ratio closer to 1 to make change slower
+ ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
+ distance = (long) (ratio * distance);
+ }
+
+ // System.out.println("P2 "+delta +" "+numToScan+" "+distance+" "+((double)numToScan/count ));
+ }
+
+ System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, Constants.UTF8), (t2 - t1), count);
+
+ if (opts.sleepTime > 0)
+ UtilWaitThread.sleep(opts.sleepTime);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java
index 0000000,0000000..a8b2930
new file mode 100644
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java
@@@ -1,0 -1,0 +1,49 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test.continuous;
++
++import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.client.TableNotFoundException;
++import org.apache.accumulo.core.security.Authorizations;
++
++/**
++ * Useful utility methods common to the Continuous test suite.
++ */
++final class ContinuousUtil {
++ private ContinuousUtil() {}
++
++ /**
++ * Attempt to create a table scanner, or fail if the table does not exist.
++ *
++ * @param connector
++ * A populated connector object
++ * @param table
++ * The table name to scan over
++ * @param auths
++ * The authorizations to use for the scanner
++ * @return a scanner for the requested table
++ * @throws TableNotFoundException
++ * If the table does not exist
++ */
++ static Scanner createScanner(Connector connector, String table, Authorizations auths) throws TableNotFoundException {
++ if (!connector.tableOperations().exists(table)) {
++ throw new TableNotFoundException(null, table, "Consult the README and create the table before starting test processes.");
++ }
++ return connector.createScanner(table, auths);
++ }
++}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
index 4032dfa,0000000..34a5e9b
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
@@@ -1,237 -1,0 +1,237 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.zip.CRC32;
+
- import org.apache.accumulo.trace.instrument.Span;
- import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.trace.instrument.Span;
++import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
+
+
+public class ContinuousWalk {
+
+ static public class Opts extends ContinuousQuery.Opts {
+ class RandomAuthsConverter implements IStringConverter<RandomAuths> {
+ @Override
+ public RandomAuths convert(String value) {
+ try {
+ return new RandomAuths(value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ @Parameter(names="--authsFile", description="read the authorities to use from a file")
+ RandomAuths randomAuths = new RandomAuths();
+ }
+
+ static class BadChecksumException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public BadChecksumException(String msg) {
+ super(msg);
+ }
+
+ }
+
+ static class RandomAuths {
+ private List<Authorizations> auths;
+
+ RandomAuths() {
+ auths = Collections.singletonList(Constants.NO_AUTHS);
+ }
+
+ RandomAuths(String file) throws IOException {
+ if (file == null) {
+ auths = Collections.singletonList(Constants.NO_AUTHS);
+ return;
+ }
+
+ auths = new ArrayList<Authorizations>();
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), Constants.UTF8));
+ try {
+ String line;
+ while ((line = in.readLine()) != null) {
+ auths.add(new Authorizations(line.split(",")));
+ }
+ } finally {
+ in.close();
+ }
+ }
+
+ Authorizations getAuths(Random r) {
+ return auths.get(r.nextInt(auths.size()));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(ContinuousWalk.class.getName(), args);
+
+ Connector conn = opts.getConnector();
+
+ Random r = new Random();
+
+ ArrayList<Value> values = new ArrayList<Value>();
+
+ while (true) {
- Scanner scanner = conn.createScanner(opts.getTableName(), opts.randomAuths.getAuths(r));
++ Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.randomAuths.getAuths(r));
+ String row = findAStartRow(opts.min, opts.max, scanner, r);
+
+ while (row != null) {
+
+ values.clear();
+
+ long t1 = System.currentTimeMillis();
+ Span span = Trace.on("walk");
+ try {
+ scanner.setRange(new Range(new Text(row)));
+ for (Entry<Key,Value> entry : scanner) {
+ validate(entry.getKey(), entry.getValue());
+ values.add(entry.getValue());
+ }
+ } finally {
+ span.stop();
+ }
+ long t2 = System.currentTimeMillis();
+
+ System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size());
+
+ if (values.size() > 0) {
+ row = getPrevRow(values.get(r.nextInt(values.size())));
+ } else {
+ System.out.printf("MIS %d %s%n", t1, row);
+ System.err.printf("MIS %d %s%n", t1, row);
+ row = null;
+ }
+
+ if (opts.sleepTime > 0)
+ Thread.sleep(opts.sleepTime);
+ }
+
+ if (opts.sleepTime > 0)
+ Thread.sleep(opts.sleepTime);
+ }
+ }
+
+ private static String findAStartRow(long min, long max, Scanner scanner, Random r) {
+
+ byte[] scanStart = ContinuousIngest.genRow(min, max, r);
+ scanner.setRange(new Range(new Text(scanStart), null));
+ scanner.setBatchSize(100);
+
+ int count = 0;
+ String pr = null;
+
+ long t1 = System.currentTimeMillis();
+
+ for (Entry<Key,Value> entry : scanner) {
+ validate(entry.getKey(), entry.getValue());
+ pr = getPrevRow(entry.getValue());
+ count++;
+ if (pr != null)
+ break;
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ System.out.printf("FSR %d %s %d %d%n", t1, new String(scanStart, Constants.UTF8), (t2 - t1), count);
+
+ return pr;
+ }
+
+ static int getPrevRowOffset(byte val[]) {
+ if (val.length == 0)
+ throw new IllegalArgumentException();
+ if (val[53] != ':')
+ throw new IllegalArgumentException(new String(val, Constants.UTF8));
+
+ // prev row starts at 54
+ if (val[54] != ':') {
+ if (val[54 + 16] != ':')
+ throw new IllegalArgumentException(new String(val, Constants.UTF8));
+ return 54;
+ }
+
+ return -1;
+ }
+
+ static String getPrevRow(Value value) {
+
+ byte[] val = value.get();
+ int offset = getPrevRowOffset(val);
+ if (offset > 0) {
+ return new String(val, offset, 16, Constants.UTF8);
+ }
+
+ return null;
+ }
+
+ static int getChecksumOffset(byte val[]) {
+ if (val[val.length - 1] != ':') {
+ if (val[val.length - 9] != ':')
+ throw new IllegalArgumentException(new String(val, Constants.UTF8));
+ return val.length - 8;
+ }
+
+ return -1;
+ }
+
+ static void validate(Key key, Value value) throws BadChecksumException {
+ int ckOff = getChecksumOffset(value.get());
+ if (ckOff < 0)
+ return;
+
+ long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, Constants.UTF8), 16);
+
+ CRC32 cksum = new CRC32();
+
+ cksum.update(key.getRowData().toArray());
+ cksum.update(key.getColumnFamilyData().toArray());
+ cksum.update(key.getColumnQualifierData().toArray());
+ cksum.update(key.getColumnVisibilityData().toArray());
+ cksum.update(value.get(), 0, ckOff);
+
+ if (cksum.getValue() != storedCksum) {
+ throw new BadChecksumException("Checksum invalid " + key + " " + value);
+ }
+ }
+}