You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/03/07 21:53:24 UTC

[10/50] [abbrv] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT

Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bd283aec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bd283aec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bd283aec

Branch: refs/heads/ACCUMULO-2061
Commit: bd283aec0aa10d4c9deba8b5df097d48a424780b
Parents: bee78fa 759582b
Author: Mike Drob <md...@cloudera.com>
Authored: Tue Mar 4 09:15:16 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 09:15:16 2014 -0500

----------------------------------------------------------------------
 .../test/continuous/ContinuousBatchWalker.java  |  2 +-
 .../test/continuous/ContinuousIngest.java       |  9 ++--
 .../test/continuous/ContinuousQuery.java        |  2 +-
 .../test/continuous/ContinuousScanner.java      |  2 +-
 .../test/continuous/ContinuousUtil.java         | 49 ++++++++++++++++++++
 .../test/continuous/ContinuousWalk.java         |  6 +--
 6 files changed, 59 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
index d021164,0000000..3304d24
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
@@@ -1,182 -1,0 +1,182 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.cli.BatchScannerOpts;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +public class ContinuousBatchWalker {
 +
 +  static class Opts extends ContinuousWalk.Opts {
 +    @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class)
 +    long numToScan = 0;
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    
 +    Opts opts = new Opts();
 +    ScannerOpts scanOpts = new ScannerOpts();
 +    BatchScannerOpts bsOpts = new BatchScannerOpts();
 +    opts.parseArgs(ContinuousBatchWalker.class.getName(), args, scanOpts, bsOpts);
 +    
 +    Random r = new Random();
 +    Authorizations auths = opts.randomAuths.getAuths(r);
 +
 +    Connector conn = opts.getConnector();
-     Scanner scanner = conn.createScanner(opts.getTableName(), auths);
++    Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths);
 +    scanner.setBatchSize(scanOpts.scanBatchSize);
 +    
 +    BatchScanner bs = conn.createBatchScanner(opts.getTableName(), auths, bsOpts.scanThreads);
 +    bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
 +
 +    while (true) {
 +      Set<Text> batch = getBatch(scanner, opts.min, opts.max, scanOpts.scanBatchSize, r);
 +      List<Range> ranges = new ArrayList<Range>(batch.size());
 +      
 +      for (Text row : batch) {
 +        ranges.add(new Range(row));
 +      }
 +      
 +      runBatchScan(scanOpts.scanBatchSize, bs, batch, ranges);
 +      
 +      UtilWaitThread.sleep(opts.sleepTime);
 +    }
 +    
 +  }
 +  
 +  /*
 +   * private static void runSequentialScan(Scanner scanner, List<Range> ranges) { Set<Text> srowsSeen = new HashSet<Text>(); long st1 =
 +   * System.currentTimeMillis(); int scount = 0; for (Range range : ranges) { scanner.setRange(range);
 +   * 
 +   * for (Entry<Key,Value> entry : scanner) { srowsSeen.add(entry.getKey().getRow()); scount++; } }
 +   * 
 +   * 
 +   * long st2 = System.currentTimeMillis(); System.out.println("SRQ "+(st2 - st1)+" "+srowsSeen.size() +" "+scount); }
 +   */
 +  
 +  private static void runBatchScan(int batchSize, BatchScanner bs, Set<Text> batch, List<Range> ranges) {
 +    bs.setRanges(ranges);
 +    
 +    Set<Text> rowsSeen = new HashSet<Text>();
 +    
 +    int count = 0;
 +    
 +    long t1 = System.currentTimeMillis();
 +    
 +    for (Entry<Key,Value> entry : bs) {
 +      ContinuousWalk.validate(entry.getKey(), entry.getValue());
 +      
 +      rowsSeen.add(entry.getKey().getRow());
 +      
 +      addRow(batchSize, entry.getValue());
 +      
 +      count++;
 +    }
 +    
 +    long t2 = System.currentTimeMillis();
 +    
 +    if (!rowsSeen.equals(batch)) {
 +      HashSet<Text> copy1 = new HashSet<Text>(rowsSeen);
 +      HashSet<Text> copy2 = new HashSet<Text>(batch);
 +      
 +      copy1.removeAll(batch);
 +      copy2.removeAll(rowsSeen);
 +      
 +      System.out.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
 +      System.err.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
 +      System.err.println("Extra seen : " + copy1);
 +      System.err.println("Not seen   : " + copy2);
 +    } else {
 +      System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count, (int) (rowsSeen.size() / ((t2 - t1) / 1000.0)));
 +    }
 +    
 +  }
 +  
 +  private static void addRow(int batchSize, Value v) {
 +    byte[] val = v.get();
 +    
 +    int offset = ContinuousWalk.getPrevRowOffset(val);
 +    if (offset > 1) {
 +      Text prevRow = new Text();
 +      prevRow.set(val, offset, 16);
 +      if (rowsToQuery.size() < 3 * batchSize) {
 +        rowsToQuery.add(prevRow);
 +      }
 +    }
 +  }
 +  
 +  private static HashSet<Text> rowsToQuery = new HashSet<Text>();
 +  
 +  private static Set<Text> getBatch(Scanner scanner, long min, long max, int batchSize, Random r) {
 +    
 +    while (rowsToQuery.size() < batchSize) {
 +      byte[] scanStart = ContinuousIngest.genRow(min, max, r);
 +      scanner.setRange(new Range(new Text(scanStart), null));
 +      
 +      int count = 0;
 +      
 +      long t1 = System.currentTimeMillis();
 +      
 +      Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +      while (iter.hasNext() && rowsToQuery.size() < 3 * batchSize) {
 +        Entry<Key,Value> entry = iter.next();
 +        ContinuousWalk.validate(entry.getKey(), entry.getValue());
 +        addRow(batchSize, entry.getValue());
 +        count++;
 +      }
 +      
 +      long t2 = System.currentTimeMillis();
 +      
 +      System.out.println("FSB " + t1 + " " + (t2 - t1) + " " + count);
 +      
 +      UtilWaitThread.sleep(100);
 +    }
 +    
 +    HashSet<Text> ret = new HashSet<Text>();
 +    
 +    Iterator<Text> iter = rowsToQuery.iterator();
 +    
 +    for (int i = 0; i < batchSize; i++) {
 +      ret.add(iter.next());
 +      iter.remove();
 +    }
 +    
 +    return ret;
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
index 23cf15d,0000000..e3f0485
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
@@@ -1,312 -1,0 +1,311 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.UUID;
 +import java.util.zip.CRC32;
 +import java.util.zip.Checksum;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.cli.ClientOnDefaultTable;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
- import org.apache.accumulo.core.client.TableExistsException;
++import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.core.util.FastFormat;
 +import org.apache.accumulo.trace.instrument.CountSampler;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.FileAppender;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +import org.apache.log4j.PatternLayout;
 +
 +import com.beust.jcommander.IStringConverter;
 +import com.beust.jcommander.Parameter;
 +
 +
 +public class ContinuousIngest {
 +  
 +  static public class BaseOpts extends ClientOnDefaultTable {
 +    public class DebugConverter implements IStringConverter<String> {
 +      @Override
 +      public String convert(String debugLog) {
 +        Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME);
 +        logger.setLevel(Level.TRACE);
 +        logger.setAdditivity(false);
 +        try {
 +          logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
 +        } catch (IOException ex) {
 +          throw new RuntimeException(ex);
 +        }
 +        return debugLog;
 +      }
 +    }
 +    
 +    @Parameter(names="--min", description="lowest random row number to use")
 +    long min = 0;
 +    
 +    @Parameter(names="--max", description="maximum random row number to use")
 +    long max = Long.MAX_VALUE;
 +    
 +    @Parameter(names="--debugLog", description="file to write debugging output", converter=DebugConverter.class)
 +    String debugLog = null;
 +
 +    BaseOpts() { super("ci"); }
 +  }
 +  
 +  public static class ShortConverter implements IStringConverter<Short> {
 +    @Override
 +    public Short convert(String value) {
 +      return Short.valueOf(value);
 +    }
 +  }
 +  
 +  static public class Opts extends BaseOpts {
 +    @Parameter(names="--num", description="the number of entries to ingest")
 +    long num = Long.MAX_VALUE;
 +    
 +    @Parameter(names="--maxColF", description="maximum column family value to use", converter=ShortConverter.class)
 +    short maxColF = Short.MAX_VALUE;
 +    
 +    @Parameter(names="--maxColQ", description="maximum column qualifier value to use", converter=ShortConverter.class)
 +    short maxColQ = Short.MAX_VALUE;
 + 
 +    @Parameter(names="--addCheckSum", description="turn on checksums")
 +    boolean checksum = false;
 +    
 +    @Parameter(names="--visibilities", description="read the visibilities to ingest with from a file")
 +    String visFile = null;
 +  }
 +  
 +  private static final byte[] EMPTY_BYTES = new byte[0];
 +  
 +  private static List<ColumnVisibility> visibilities;
 +  
 +  private static void initVisibilities(Opts opts) throws Exception {
 +    if (opts.visFile == null) {
 +      visibilities = Collections.singletonList(new ColumnVisibility());
 +      return;
 +    }
 +    
 +    visibilities = new ArrayList<ColumnVisibility>();
 +    
 +    FileSystem fs = FileSystem.get(new Configuration());
 +    BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), Constants.UTF8));
 +    
 +    String line;
 +    
 +    while ((line = in.readLine()) != null) {
 +      visibilities.add(new ColumnVisibility(line));
 +    }
 +    
 +    in.close();
 +  }
 +
 +  private static ColumnVisibility getVisibility(Random rand) {
 +    return visibilities.get(rand.nextInt(visibilities.size()));
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    
 +    Opts opts = new Opts();
 +    BatchWriterOpts bwOpts = new BatchWriterOpts();
 +    opts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts);
 +    
 +    initVisibilities(opts);
 +
 +    if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) {
 +      throw new IllegalArgumentException("bad min and max");
 +    }
 +    Connector conn = opts.getConnector();
 +    
-     if (!conn.tableOperations().exists(opts.getTableName()))
-       try {
-         conn.tableOperations().create(opts.getTableName());
-       } catch (TableExistsException tee) {}
++    if (!conn.tableOperations().exists(opts.getTableName())) {
++      throw new TableNotFoundException(null, opts.getTableName(), "Consult the README and create the table before starting ingest.");
++    }
 +
 +    BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
 +    bw = Trace.wrapAll(bw, new CountSampler(1024));
 +    
 +    Random r = new Random();
 +    
 +    byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(Constants.UTF8);
 +    
 +    System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId, Constants.UTF8));
 +    
 +    long count = 0;
 +    final int flushInterval = 1000000;
 +    final int maxDepth = 25;
 +    
 +    // always want to point back to flushed data. This way the previous item should
 +    // always exist in accumulo when verifying data. To do this make insert N point
 +    // back to the row from insert (N - flushInterval). The array below is used to keep
 +    // track of this.
 +    long prevRows[] = new long[flushInterval];
 +    long firstRows[] = new long[flushInterval];
 +    int firstColFams[] = new int[flushInterval];
 +    int firstColQuals[] = new int[flushInterval];
 +    
 +    long lastFlushTime = System.currentTimeMillis();
 +    
 +    out: while (true) {
 +      // generate first set of nodes
 +      ColumnVisibility cv = getVisibility(r);
 +
 +      for (int index = 0; index < flushInterval; index++) {
 +        long rowLong = genLong(opts.min, opts.max, r);
 +        prevRows[index] = rowLong;
 +        firstRows[index] = rowLong;
 +        
 +        int cf = r.nextInt(opts.maxColF);
 +        int cq = r.nextInt(opts.maxColQ);
 +        
 +        firstColFams[index] = cf;
 +        firstColQuals[index] = cq;
 +        
 +        Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum);
 +        count++;
 +        bw.addMutation(m);
 +      }
 +      
 +      lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
 +      if (count >= opts.num)
 +        break out;
 +      
 +      // generate subsequent sets of nodes that link to previous set of nodes
 +      for (int depth = 1; depth < maxDepth; depth++) {
 +        for (int index = 0; index < flushInterval; index++) {
 +          long rowLong = genLong(opts.min, opts.max, r);
 +          byte[] prevRow = genRow(prevRows[index]);
 +          prevRows[index] = rowLong;
 +          Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum);
 +          count++;
 +          bw.addMutation(m);
 +        }
 +        
 +        lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
 +        if (count >= opts.num)
 +          break out;
 +      }
 +      
 +      // create one big linked list, this makes all of the first inserts
 +      // point to something
 +      for (int index = 0; index < flushInterval - 1; index++) {
 +        Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r,
 +            opts.checksum);
 +        count++;
 +        bw.addMutation(m);
 +      }
 +      lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
 +      if (count >= opts.num)
 +        break out;
 +    }
 +    
 +    bw.close();
 +    opts.stopTracing();
 +  }
 +
 +  private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException {
 +    long t1 = System.currentTimeMillis();
 +    bw.flush();
 +    long t2 = System.currentTimeMillis();
 +    System.out.printf("FLUSH %d %d %d %d %d%n", t2, (t2 - lastFlushTime), (t2 - t1), count, flushInterval);
 +    lastFlushTime = t2;
 +    return lastFlushTime;
 +  }
 +  
 +  public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r,
 +      boolean checksum) {
 +    // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead
 +    CRC32 cksum = null;
 +    
 +    byte[] rowString = genRow(rowLong);
 +    
 +    byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES);
 +    byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES);
 +    
 +    if (checksum) {
 +      cksum = new CRC32();
 +      cksum.update(rowString);
 +      cksum.update(cfString);
 +      cksum.update(cqString);
 +      cksum.update(cv.getExpression());
 +    }
 +    
 +    Mutation m = new Mutation(new Text(rowString));
 +    
 +    m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum));
 +    return m;
 +  }
 +  
 +  public static final long genLong(long min, long max, Random r) {
 +    return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min;
 +  }
 +  
 +  static final byte[] genRow(long min, long max, Random r) {
 +    return genRow(genLong(min, max, r));
 +  }
 +  
 +  static final byte[] genRow(long rowLong) {
 +    return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
 +  }
 +  
 +  private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) {
 +    int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
 +    if (cksum != null)
 +      dataLen += 8;
 +    byte val[] = new byte[dataLen];
 +    System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
 +    int index = ingestInstanceId.length;
 +    val[index++] = ':';
 +    int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, EMPTY_BYTES);
 +    if (added != 16)
 +      throw new RuntimeException(" " + added);
 +    index += 16;
 +    val[index++] = ':';
 +    if (prevRow != null) {
 +      System.arraycopy(prevRow, 0, val, index, prevRow.length);
 +      index += prevRow.length;
 +    }
 +    
 +    val[index++] = ':';
 +    
 +    if (cksum != null) {
 +      cksum.update(val, 0, index);
 +      cksum.getValue();
 +      FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES);
 +    }
 +    
 +    // System.out.println("val "+new String(val));
 +    
 +    return new Value(val);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
index 117c136,0000000..4bbc85f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java
@@@ -1,71 -1,0 +1,71 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.util.Map.Entry;
 +import java.util.Random;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.test.continuous.ContinuousIngest.BaseOpts;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.Parameter;
 +
 +public class ContinuousQuery {
 +  
 +  public static class Opts extends BaseOpts {
 +    @Parameter(names="--sleep", description="the time to wait between queries", converter=TimeConverter.class)
 +    long sleepTime = 100;
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    ScannerOpts scanOpts = new ScannerOpts();
 +    opts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts);
 +    
 +    Connector conn = opts.getConnector();
-     Scanner scanner = conn.createScanner(opts.getTableName(), opts.auths);
++    Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.auths);
 +    scanner.setBatchSize(scanOpts.scanBatchSize);
 +    
 +    Random r = new Random();
 +    
 +    while (true) {
 +      byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r);
 +      
 +      int count = 0;
 +      
 +      long t1 = System.currentTimeMillis();
 +      scanner.setRange(new Range(new Text(row)));
 +      for (Entry<Key,Value> entry : scanner) {
 +        ContinuousWalk.validate(entry.getKey(), entry.getValue());
 +        count++;
 +      }
 +      long t2 = System.currentTimeMillis();
 +      
 +      System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, Constants.UTF8), (t2 - t1), count);
 +      
 +      if (opts.sleepTime > 0)
 +        Thread.sleep(opts.sleepTime);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
index c331bab,0000000..fcc8fec
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
@@@ -1,104 -1,0 +1,104 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.util.Iterator;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +public class ContinuousScanner {
 +  
 +  static class Opts extends ContinuousWalk.Opts {
 +    @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class)
 +    long numToScan = 0;
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    ScannerOpts scanOpts = new ScannerOpts();
 +    opts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts);
 +    
 +    Random r = new Random();
 +
 +    long distance = 1000000000000l;
 +    
 +    Connector conn = opts.getConnector();
 +    Authorizations auths = opts.randomAuths.getAuths(r);
-     Scanner scanner = conn.createScanner(opts.getTableName(), auths);
++    Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths);
 +    scanner.setBatchSize(scanOpts.scanBatchSize);
 +    
 +    double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0));
 +    
 +    while (true) {
 +      long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r);
 +      byte[] scanStart = ContinuousIngest.genRow(startRow);
 +      byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
 +      
 +      scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
 +      
 +      int count = 0;
 +      Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +      
 +      long t1 = System.currentTimeMillis();
 +      
 +      while (iter.hasNext()) {
 +        Entry<Key,Value> entry = iter.next();
 +        ContinuousWalk.validate(entry.getKey(), entry.getValue());
 +        count++;
 +      }
 +      
 +      long t2 = System.currentTimeMillis();
 +      
 +      // System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
 +      
 +      if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) {
 +        if (count == 0) {
 +          distance = distance * 10;
 +          if (distance < 0)
 +            distance = 1000000000000l;
 +        } else {
 +          double ratio = (double) opts.numToScan / count;
 +          // move ratio closer to 1 to make change slower
 +          ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
 +          distance = (long) (ratio * distance);
 +        }
 +        
 +        // System.out.println("P2 "+delta +" "+numToScan+" "+distance+"  "+((double)numToScan/count ));
 +      }
 +      
 +      System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, Constants.UTF8), (t2 - t1), count);
 +      
 +      if (opts.sleepTime > 0)
 +        UtilWaitThread.sleep(opts.sleepTime);
 +    }
 +    
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java
index 0000000,0000000..a8b2930
new file mode 100644
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java
@@@ -1,0 -1,0 +1,49 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test.continuous;
++
++import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.client.TableNotFoundException;
++import org.apache.accumulo.core.security.Authorizations;
++
++/**
++ * Useful utility methods common to the Continuous test suite.
++ */
++final class ContinuousUtil {
++  private ContinuousUtil() {}
++
++  /**
++   * Attempt to create a table scanner, or fail if the table does not exist.
++   *
++   * @param connector
++   *          A populated connector object
++   * @param table
++   *          The table name to scan over
++   * @param auths
++   *          The authorizations to use for the scanner
++   * @return a scanner for the requested table
++   * @throws TableNotFoundException
++   *           If the table does not exist
++   */
++  static Scanner createScanner(Connector connector, String table, Authorizations auths) throws TableNotFoundException {
++    if (!connector.tableOperations().exists(table)) {
++      throw new TableNotFoundException(null, table, "Consult the README and create the table before starting test processes.");
++    }
++    return connector.createScanner(table, auths);
++  }
++}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
index 4032dfa,0000000..34a5e9b
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
@@@ -1,237 -1,0 +1,237 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.zip.CRC32;
 +
- import org.apache.accumulo.trace.instrument.Span;
- import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.trace.instrument.Span;
++import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.IStringConverter;
 +import com.beust.jcommander.Parameter;
 +
 +
 +public class ContinuousWalk {
 +  
 +  static public class Opts extends ContinuousQuery.Opts {
 +    class RandomAuthsConverter implements IStringConverter<RandomAuths> {
 +      @Override
 +      public RandomAuths convert(String value) {
 +        try {
 +          return new RandomAuths(value);
 +        } catch (IOException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +    @Parameter(names="--authsFile", description="read the authorities to use from a file")
 +    RandomAuths randomAuths = new RandomAuths();
 +  }
 +  
 +  static class BadChecksumException extends RuntimeException {
 +    private static final long serialVersionUID = 1L;
 +    
 +    public BadChecksumException(String msg) {
 +      super(msg);
 +    }
 +    
 +  }
 +  
 +  static class RandomAuths {
 +    private List<Authorizations> auths;
 +    
 +    RandomAuths() {
 +      auths = Collections.singletonList(Constants.NO_AUTHS);
 +    }
 +    
 +    RandomAuths(String file) throws IOException {
 +      if (file == null) {
 +        auths = Collections.singletonList(Constants.NO_AUTHS);
 +        return;
 +      }
 +      
 +      auths = new ArrayList<Authorizations>();
 +      
 +      FileSystem fs = FileSystem.get(new Configuration());
 +      BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), Constants.UTF8));
 +      try {
 +        String line;
 +        while ((line = in.readLine()) != null) {
 +          auths.add(new Authorizations(line.split(",")));
 +        }
 +      } finally {
 +        in.close();
 +      }
 +    }
 +    
 +    Authorizations getAuths(Random r) {
 +      return auths.get(r.nextInt(auths.size()));
 +    }
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(ContinuousWalk.class.getName(), args);
 +    
 +    Connector conn = opts.getConnector();
 +    
 +    Random r = new Random();
 +    
 +    ArrayList<Value> values = new ArrayList<Value>();
 +    
 +    while (true) {
-       Scanner scanner = conn.createScanner(opts.getTableName(), opts.randomAuths.getAuths(r));
++      Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.randomAuths.getAuths(r));
 +      String row = findAStartRow(opts.min, opts.max, scanner, r);
 +      
 +      while (row != null) {
 +        
 +        values.clear();
 +        
 +        long t1 = System.currentTimeMillis();
 +        Span span = Trace.on("walk");
 +        try {
 +          scanner.setRange(new Range(new Text(row)));
 +          for (Entry<Key,Value> entry : scanner) {
 +            validate(entry.getKey(), entry.getValue());
 +            values.add(entry.getValue());
 +          }
 +        } finally {
 +          span.stop();
 +        }
 +        long t2 = System.currentTimeMillis();
 +        
 +        System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size());
 +        
 +        if (values.size() > 0) {
 +          row = getPrevRow(values.get(r.nextInt(values.size())));
 +        } else {
 +          System.out.printf("MIS %d %s%n", t1, row);
 +          System.err.printf("MIS %d %s%n", t1, row);
 +          row = null;
 +        }
 +        
 +        if (opts.sleepTime > 0)
 +          Thread.sleep(opts.sleepTime);
 +      }
 +      
 +      if (opts.sleepTime > 0)
 +        Thread.sleep(opts.sleepTime);
 +    }
 +  }
 +  
 +  private static String findAStartRow(long min, long max, Scanner scanner, Random r) {
 +    
 +    byte[] scanStart = ContinuousIngest.genRow(min, max, r);
 +    scanner.setRange(new Range(new Text(scanStart), null));
 +    scanner.setBatchSize(100);
 +    
 +    int count = 0;
 +    String pr = null;
 +    
 +    long t1 = System.currentTimeMillis();
 +    
 +    for (Entry<Key,Value> entry : scanner) {
 +      validate(entry.getKey(), entry.getValue());
 +      pr = getPrevRow(entry.getValue());
 +      count++;
 +      if (pr != null)
 +        break;
 +    }
 +    
 +    long t2 = System.currentTimeMillis();
 +    
 +    System.out.printf("FSR %d %s %d %d%n", t1, new String(scanStart, Constants.UTF8), (t2 - t1), count);
 +    
 +    return pr;
 +  }
 +  
 +  static int getPrevRowOffset(byte val[]) {
 +    if (val.length == 0)
 +      throw new IllegalArgumentException();
 +    if (val[53] != ':')
 +      throw new IllegalArgumentException(new String(val, Constants.UTF8));
 +    
 +    // prev row starts at 54
 +    if (val[54] != ':') {
 +      if (val[54 + 16] != ':')
 +        throw new IllegalArgumentException(new String(val, Constants.UTF8));
 +      return 54;
 +    }
 +    
 +    return -1;
 +  }
 +  
 +  static String getPrevRow(Value value) {
 +    
 +    byte[] val = value.get();
 +    int offset = getPrevRowOffset(val);
 +    if (offset > 0) {
 +      return new String(val, offset, 16, Constants.UTF8);
 +    }
 +    
 +    return null;
 +  }
 +  
 +  static int getChecksumOffset(byte val[]) {
 +    if (val[val.length - 1] != ':') {
 +      if (val[val.length - 9] != ':')
 +        throw new IllegalArgumentException(new String(val, Constants.UTF8));
 +      return val.length - 8;
 +    }
 +    
 +    return -1;
 +  }
 +  
 +  static void validate(Key key, Value value) throws BadChecksumException {
 +    int ckOff = getChecksumOffset(value.get());
 +    if (ckOff < 0)
 +      return;
 +    
 +    long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, Constants.UTF8), 16);
 +    
 +    CRC32 cksum = new CRC32();
 +    
 +    cksum.update(key.getRowData().toArray());
 +    cksum.update(key.getColumnFamilyData().toArray());
 +    cksum.update(key.getColumnQualifierData().toArray());
 +    cksum.update(key.getColumnVisibilityData().toArray());
 +    cksum.update(value.get(), 0, ckOff);
 +    
 +    if (cksum.getValue() != storedCksum) {
 +      throw new BadChecksumException("Checksum invalid " + key + " " + value);
 +    }
 +  }
 +}