You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:57:49 UTC

[18/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
index 06b9a7c,0000000..70156b2
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
@@@ -1,223 -1,0 +1,222 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.ClientOnDefaultTable;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.util.reflection.CounterUtils;
 +import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.VLongWritable;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +/**
 + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
 + */
 +
 +public class ContinuousVerify extends Configured implements Tool {
 +  public static final VLongWritable DEF = new VLongWritable(-1);
 +
 +  public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
 +
 +    private LongWritable row = new LongWritable();
 +    private LongWritable ref = new LongWritable();
 +    private VLongWritable vrow = new VLongWritable();
 +
 +    private long corrupt = 0;
 +
 +    @Override
 +    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
 +      long r = Long.parseLong(key.getRow().toString(), 16);
 +      if (r < 0)
 +        throw new IllegalArgumentException();
 +
 +      try {
 +        ContinuousWalk.validate(key, data);
 +      } catch (BadChecksumException bce) {
 +        CounterUtils.increment(context.getCounter(Counts.CORRUPT));
 +        if (corrupt < 1000) {
 +          System.out.println("ERROR Bad checksum : " + key);
 +        } else if (corrupt == 1000) {
 +          System.out.println("Too many bad checksums, not printing anymore!");
 +        }
 +        corrupt++;
 +        return;
 +      }
 +
 +      row.set(r);
 +
 +      context.write(row, DEF);
 +      byte[] val = data.get();
 +
 +      int offset = ContinuousWalk.getPrevRowOffset(val);
 +      if (offset > 0) {
 +        ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16));
 +        vrow.set(r);
 +        context.write(ref, vrow);
 +      }
 +    }
 +  }
 +
 +  public static enum Counts {
 +    UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
 +  }
 +
 +  public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
 +    private ArrayList<Long> refs = new ArrayList<Long>();
 +
 +    @Override
 +    public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
 +
 +      int defCount = 0;
 +
 +      refs.clear();
 +      for (VLongWritable type : values) {
 +        if (type.get() == -1) {
 +          defCount++;
 +        } else {
 +          refs.add(type.get());
 +        }
 +      }
 +
 +      if (defCount == 0 && refs.size() > 0) {
 +        StringBuilder sb = new StringBuilder();
 +        String comma = "";
 +        for (Long ref : refs) {
 +          sb.append(comma);
 +          comma = ",";
 +          sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8));
 +        }
 +
 +        context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
 +        CounterUtils.increment(context.getCounter(Counts.UNDEFINED));
 +
 +      } else if (defCount > 0 && refs.size() == 0) {
 +        CounterUtils.increment(context.getCounter(Counts.UNREFERENCED));
 +      } else {
 +        CounterUtils.increment(context.getCounter(Counts.REFERENCED));
 +      }
 +
 +    }
 +  }
 +
 +  static class Opts extends ClientOnDefaultTable {
 +    @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true)
 +    String outputDir = "/tmp/continuousVerify";
 +
 +    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
 +    int maxMaps = 0;
 +
 +    @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class)
 +    int reducers = 0;
 +
 +    @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
 +    boolean scanOffline = false;
 +
 +    public Opts() {
 +      super("ci");
 +    }
 +  }
 +
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(this.getClass().getName(), args);
 +
 +    Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
 +    job.setJarByClass(this.getClass());
 +
 +    job.setInputFormatClass(AccumuloInputFormat.class);
 +    opts.setAccumuloConfigs(job);
 +
 +    Set<Range> ranges = null;
 +    String clone = opts.getTableName();
 +    Connector conn = null;
 +
 +    if (opts.scanOffline) {
 +      Random random = new Random();
 +      clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
 +      conn = opts.getConnector();
 +      conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
 +      ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +      conn.tableOperations().offline(clone);
 +      AccumuloInputFormat.setInputTableName(job, clone);
 +      AccumuloInputFormat.setOfflineTableScan(job, true);
 +    } else {
 +      ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +    }
 +
 +    AccumuloInputFormat.setRanges(job, ranges);
 +    AccumuloInputFormat.setAutoAdjustRanges(job, false);
 +
 +    job.setMapperClass(CMapper.class);
 +    job.setMapOutputKeyClass(LongWritable.class);
 +    job.setMapOutputValueClass(VLongWritable.class);
 +
 +    job.setReducerClass(CReducer.class);
 +    job.setNumReduceTasks(opts.reducers);
 +
 +    job.setOutputFormatClass(TextOutputFormat.class);
 +
 +    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
 +
 +    TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
 +
 +    job.waitForCompletion(true);
 +
 +    if (opts.scanOffline) {
 +      conn.tableOperations().delete(clone);
 +    }
 +    opts.stopTracing();
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +
 +  /**
 +   * 
 +   * @param args
 +   *          instanceName zookeepers username password table columns outputpath
-    * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
 +    if (res != 0)
 +      System.exit(res);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
index c522914,0000000..f1dfcd2
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
@@@ -1,51 -1,0 +1,48 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.io.File;
 +import java.util.Arrays;
 +
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +
 +public class CacheTestClean {
 +  
-   /**
-    * @param args
-    */
 +  public static void main(String[] args) throws Exception {
 +    String rootDir = args[0];
 +    File reportDir = new File(args[1]);
 +    
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    
 +    if (zoo.exists(rootDir)) {
 +      zoo.recursiveDelete(rootDir, NodeMissingPolicy.FAIL);
 +    }
 +    
 +    if (!reportDir.exists()) {
 +      reportDir.mkdir();
 +    } else {
 +      File[] files = reportDir.listFiles();
 +      if (files.length != 0)
 +        throw new Exception("dir " + reportDir + " is not empty: " + Arrays.asList(files));
 +    }
 +    
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
index 2b775c5,0000000..06c6fdb
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
@@@ -1,217 -1,0 +1,213 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.io.BufferedReader;
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.server.util.reflection.CounterUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +import org.apache.log4j.Logger;
 +
 +import com.beust.jcommander.Parameter;
 +
 +/**
 + * Runs the functional tests via map-reduce.
 + * 
 + * First, be sure everything is compiled.
 + * 
 + * Second, get a list of the tests you want to run:
 + * 
 + * <pre>
 + *  $ python test/system/auto/run.py -l > tests
 + * </pre>
 + * 
 + * Put the list of tests into HDFS:
 + * 
 + * <pre>
 + *  $ hadoop fs -put tests /user/hadoop/tests
 + * </pre>
 + * 
 + * Run the map-reduce job:
 + * 
 + * <pre>
 + *  $ ./bin/accumulo accumulo.test.functional.RunTests --tests /user/hadoop/tests --output /user/hadoop/results
 + * </pre>
 + * 
 + * Note that you will need to have some configuration in conf/accumulo-site.xml (to locate zookeeper). The map-reduce jobs will not use your local accumulo
 + * instance.
 + * 
 + */
 +public class RunTests extends Configured implements Tool {
 +  
 +  static final public String JOB_NAME = "Functional Test Runner";
 +  private static final Logger log = Logger.getLogger(RunTests.class);
 +  
 +  private Job job = null;
 +
 +  private static final int DEFAULT_TIMEOUT_FACTOR = 1;
 +
 +  static class Opts extends Help {
 +    @Parameter(names="--tests", description="newline separated list of tests to run", required=true)
 +    String testFile;
 +    @Parameter(names="--output", description="destination for the results of tests in HDFS", required=true)
 +    String outputPath;
 +    @Parameter(names="--timeoutFactor", description="Optional scaling factor for timeout for both mapred.task.timeout and -f flag on run.py", required=false)
 +    Integer intTimeoutFactor = DEFAULT_TIMEOUT_FACTOR;
 +  }
 +  
 +  static final String TIMEOUT_FACTOR = RunTests.class.getName() + ".timeoutFactor";
 +
 +  static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
 +    
 +    private static final String REDUCER_RESULT_START = "::::: ";
 +    private static final int RRS_LEN = REDUCER_RESULT_START.length();
 +    private Text result = new Text();
 +    String mapperTimeoutFactor = null;
 +
 +    private static enum Outcome {
 +      SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE
 +    }
 +    private static final Map<Character, Outcome> OUTCOME_COUNTERS;
 +    static {
 +      OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>();
 +      OUTCOME_COUNTERS.put('S', Outcome.SUCCESS);
 +      OUTCOME_COUNTERS.put('F', Outcome.FAILURE);
 +      OUTCOME_COUNTERS.put('E', Outcome.ERROR);
 +      OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS);
 +      OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE);
 +    }
 +
 +    @Override
 +    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 +      List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-m", "-f", mapperTimeoutFactor, "-t", value.toString());
 +      log.info("Running test " + cmd);
 +      ProcessBuilder pb = new ProcessBuilder(cmd);
 +      pb.directory(new File(context.getConfiguration().get("accumulo.home")));
 +      pb.redirectErrorStream(true);
 +      Process p = pb.start();
 +      p.getOutputStream().close();
 +      InputStream out = p.getInputStream();
 +      InputStreamReader outr = new InputStreamReader(out, Constants.UTF8);
 +      BufferedReader br = new BufferedReader(outr);
 +      String line;
 +      try {
 +        while ((line = br.readLine()) != null) {
 +          log.info("More: " + line);
 +          if (line.startsWith(REDUCER_RESULT_START)) {
 +            String resultLine = line.substring(RRS_LEN);
 +            if (resultLine.length() > 0) {
 +              Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0));
 +              if (outcome != null) {
 +                CounterUtils.increment(context.getCounter(outcome));
 +              }
 +            }
 +            String taskAttemptId = context.getTaskAttemptID().toString();
 +            result.set(taskAttemptId + " " + resultLine);
 +            context.write(value, result);
 +          }
 +        }
 +      } catch (Exception ex) {
 +        log.error(ex);
 +        context.progress();
 +      }
 +
 +      p.waitFor();
 +    }
 +    
 +    @Override
 +    protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException {
 +      mapperTimeoutFactor = Integer.toString(context.getConfiguration().getInt(TIMEOUT_FACTOR, DEFAULT_TIMEOUT_FACTOR));
 +    }
 +  }
 +  
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    job = new Job(getConf(), JOB_NAME);
 +    job.setJarByClass(this.getClass());
 +    Opts opts = new Opts();
 +    opts.parseArgs(RunTests.class.getName(), args);
 +    
 +    // this is like 1-2 tests per mapper
 +    Configuration conf = job.getConfiguration();
 +    conf.setInt("mapred.max.split.size", 40);
 +    conf.set("accumulo.home", System.getenv("ACCUMULO_HOME"));
 +
 +    // Taking third argument as scaling factor to setting mapred.task.timeout
 +    // and TIMEOUT_FACTOR
 +    conf.setInt("mapred.task.timeout", opts.intTimeoutFactor * 8 * 60 * 1000);
 +    conf.setInt(TIMEOUT_FACTOR, opts.intTimeoutFactor);
 +    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
 +    
 +    // set input
 +    job.setInputFormatClass(TextInputFormat.class);
 +    TextInputFormat.setInputPaths(job, new Path(opts.testFile));
 +    
 +    // set output
 +    job.setOutputFormatClass(TextOutputFormat.class);
 +    FileSystem fs = FileSystem.get(conf);
 +    Path destination = new Path(opts.outputPath);
 +    if (fs.exists(destination)) {
 +      log.info("Deleting existing output directory " + opts.outputPath);
 +      fs.delete(destination, true);
 +    }
 +    TextOutputFormat.setOutputPath(job, destination);
 +    
 +    // configure default reducer: put the results into one file
 +    job.setNumReduceTasks(1);
 +    
 +    // set mapper
 +    job.setMapperClass(TestMapper.class);
 +    job.setOutputKeyClass(Text.class);
 +    job.setOutputValueClass(Text.class);
 +    
 +    // don't do anything with the results (yet) a summary would be nice
 +    job.setNumReduceTasks(0);
 +    
 +    // submit the job
 +    log.info("Starting tests");
 +    return 0;
 +  }
 +  
-   /**
-    * @param args
-    * @throws Exception
-    */
 +  public static void main(String[] args) throws Exception {
 +    RunTests tests = new RunTests();
 +    ToolRunner.run(new Configuration(), tests, args);
 +    tests.job.waitForCompletion(true);
 +    if (!tests.job.isSuccessful())
 +      System.exit(1);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
index a9b072e,0000000..85cddbb
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
@@@ -1,246 -1,0 +1,243 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.performance.metadata;
 +
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.TreeSet;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.Stat;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.security.SecurityConstants;
 +import org.apache.hadoop.io.Text;
 +
 +/**
 + * This little program can be used to write a lot of entries to the !METADATA table and measure the performance of varying numbers of threads doing !METADATA
 + * lookups using the batch scanner.
 + * 
 + * 
 + */
 +
 +public class MetadataBatchScanTest {
 +  
-   /**
-    * @param args
-    */
 +  public static void main(String[] args) throws Exception {
 +    
 +    final Connector connector = new ZooKeeperInstance("acu14", "localhost")
 +        .getConnector(SecurityConstants.SYSTEM_PRINCIPAL, SecurityConstants.getSystemToken());
 +    
 +    TreeSet<Long> splits = new TreeSet<Long>();
 +    Random r = new Random(42);
 +    
 +    while (splits.size() < 99999) {
 +      splits.add((r.nextLong() & 0x7fffffffffffffffl) % 1000000000000l);
 +    }
 +    
 +    Text tid = new Text("8");
 +    Text per = null;
 +    
 +    ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>();
 +    
 +    for (Long split : splits) {
 +      Text er = new Text(String.format("%012d", split));
 +      KeyExtent ke = new KeyExtent(tid, er, per);
 +      per = er;
 +      
 +      extents.add(ke);
 +    }
 +    
 +    extents.add(new KeyExtent(tid, null, per));
 +    
 +    if (args[0].equals("write")) {
 +      
 +      BatchWriter bw = connector.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
 +      
 +      for (KeyExtent extent : extents) {
 +        Mutation mut = extent.getPrevRowUpdateMutation();
 +        new TServerInstance(AddressUtil.parseAddress("192.168.1.100", 4567), "DEADBEEF").putLocation(mut);
 +        bw.addMutation(mut);
 +      }
 +      
 +      bw.close();
 +    } else if (args[0].equals("writeFiles")) {
 +      BatchWriter bw = connector.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
 +      
 +      for (KeyExtent extent : extents) {
 +        
 +        Mutation mut = new Mutation(extent.getMetadataEntry());
 +        
 +        String dir = "/t-" + UUID.randomUUID();
 +        
 +        Constants.METADATA_DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes(Constants.UTF8)));
 +        
 +        for (int i = 0; i < 5; i++) {
 +          mut.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(Constants.UTF8)));
 +        }
 +        
 +        bw.addMutation(mut);
 +      }
 +      
 +      bw.close();
 +    } else if (args[0].equals("scan")) {
 +      
 +      int numThreads = Integer.parseInt(args[1]);
 +      final int numLoop = Integer.parseInt(args[2]);
 +      int numLookups = Integer.parseInt(args[3]);
 +      
 +      HashSet<Integer> indexes = new HashSet<Integer>();
 +      while (indexes.size() < numLookups) {
 +        indexes.add(r.nextInt(extents.size()));
 +      }
 +      
 +      final List<Range> ranges = new ArrayList<Range>();
 +      for (Integer i : indexes) {
 +        ranges.add(extents.get(i).toMetadataRange());
 +      }
 +      
 +      Thread threads[] = new Thread[numThreads];
 +      
 +      for (int i = 0; i < threads.length; i++) {
 +        threads[i] = new Thread(new Runnable() {
 +          
 +          @Override
 +          public void run() {
 +            try {
 +              System.out.println(runScanTest(connector, numLoop, ranges));
 +            } catch (Exception e) {
 +              e.printStackTrace();
 +            }
 +          }
 +        });
 +      }
 +      
 +      long t1 = System.currentTimeMillis();
 +      
 +      for (int i = 0; i < threads.length; i++) {
 +        threads[i].start();
 +      }
 +      
 +      for (int i = 0; i < threads.length; i++) {
 +        threads[i].join();
 +      }
 +      
 +      long t2 = System.currentTimeMillis();
 +      
 +      System.out.printf("tt : %6.2f%n", (t2 - t1) / 1000.0);
 +      
 +    } else {
 +      throw new IllegalArgumentException();
 +    }
 +    
 +  }
 +  
 +  private static ScanStats runScanTest(Connector connector, int numLoop, List<Range> ranges) throws Exception {
 +    Scanner scanner = null;/*
 +                            * connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); ColumnFQ.fetch(scanner,
 +                            * Constants.METADATA_LOCATION_COLUMN); ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN);
 +                            */
 +    
 +    BatchScanner bs = connector.createBatchScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS, 1);
 +    bs.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
 +    Constants.METADATA_PREV_ROW_COLUMN.fetch(bs);
 +    
 +    bs.setRanges(ranges);
 +    
 +    // System.out.println(ranges);
 +    
 +    ScanStats stats = new ScanStats();
 +    for (int i = 0; i < numLoop; i++) {
 +      ScanStat ss = scan(bs, ranges, scanner);
 +      stats.merge(ss);
 +    }
 +    
 +    return stats;
 +  }
 +  
 +  private static class ScanStat {
 +    long delta1;
 +    long delta2;
 +    int count1;
 +    int count2;
 +  }
 +  
 +  private static class ScanStats {
 +    Stat delta1 = new Stat();
 +    Stat delta2 = new Stat();
 +    Stat count1 = new Stat();
 +    Stat count2 = new Stat();
 +    
 +    void merge(ScanStat ss) {
 +      delta1.addStat(ss.delta1);
 +      delta2.addStat(ss.delta2);
 +      count1.addStat(ss.count1);
 +      count2.addStat(ss.count2);
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return "[" + delta1 + "] [" + delta2 + "]";
 +    }
 +  }
 +  
 +  private static ScanStat scan(BatchScanner bs, List<Range> ranges, Scanner scanner) {
 +    
 +    // System.out.println("ranges : "+ranges);
 +    
 +    ScanStat ss = new ScanStat();
 +    
 +    long t1 = System.currentTimeMillis();
 +    int count = 0;
 +    for (@SuppressWarnings("unused")
 +    Entry<Key,Value> entry : bs) {
 +      count++;
 +    }
 +    long t2 = System.currentTimeMillis();
 +    
 +    ss.delta1 = (t2 - t1);
 +    ss.count1 = count;
 +    
 +    count = 0;
 +    t1 = System.currentTimeMillis();
 +    /*
 +     * for (Range range : ranges) { scanner.setRange(range); for (Entry<Key, Value> entry : scanner) { count++; } }
 +     */
 +    
 +    t2 = System.currentTimeMillis();
 +    
 +    ss.delta2 = (t2 - t1);
 +    ss.count2 = count;
 +    
 +    return ss;
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 71970d3,0000000..e6fcd5b
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@@ -1,258 -1,0 +1,252 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.performance.thrift;
 +
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
- import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.thrift.InitialMultiScan;
 +import org.apache.accumulo.core.data.thrift.InitialScan;
 +import org.apache.accumulo.core.data.thrift.IterInfo;
 +import org.apache.accumulo.core.data.thrift.MapFileInfo;
 +import org.apache.accumulo.core.data.thrift.MultiScanResult;
 +import org.apache.accumulo.core.data.thrift.ScanResult;
 +import org.apache.accumulo.core.data.thrift.TColumn;
 +import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary;
 +import org.apache.accumulo.core.data.thrift.TKeyExtent;
 +import org.apache.accumulo.core.data.thrift.TMutation;
 +import org.apache.accumulo.core.data.thrift.TRange;
 +import org.apache.accumulo.core.data.thrift.UpdateErrors;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.client.ClientServiceHandler;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.master.state.Assignment;
 +import org.apache.accumulo.server.master.state.MetaDataStateStore;
 +import org.apache.accumulo.server.master.state.MetaDataTableScanner;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.security.SecurityConstants;
 +import org.apache.accumulo.server.util.TServerUtils;
 +import org.apache.accumulo.server.zookeeper.TransactionWatcher;
++import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.hadoop.io.Text;
 +import org.apache.thrift.TException;
 +
 +import com.beust.jcommander.Parameter;
 +
 +
 +/**
 + * The purpose of this class is to server as fake tserver that is a data sink like /dev/null. NullTserver modifies the !METADATA location entries for a table to
 + * point to it. This allows thrift performance to be measured by running any client code that writes to a table.
 + * 
 + */
 +
 +public class NullTserver {
 +  
 +  public static class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
 +    
 +    private long updateSession = 1;
 +    
 +    public ThriftClientHandler(Instance instance, TransactionWatcher watcher) {
 +      super(instance, watcher);
 +    }
 +    
 +    @Override
 +    public long startUpdate(TInfo tinfo, TCredentials credentials) {
 +      return updateSession++;
 +    }
 +    
 +    @Override
 +    public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent keyExtent, List<TMutation> mutation) {}
 +    
 +    @Override
 +    public UpdateErrors closeUpdate(TInfo tinfo, long updateID) {
 +      return new UpdateErrors(new HashMap<TKeyExtent,Long>(), new ArrayList<TConstraintViolationSummary>(), new HashMap<TKeyExtent, SecurityErrorCode>());
 +    }
 +    
 +    @Override
 +    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime) {
 +      return null;
 +    }
 +    
 +    @Override
 +    public void closeMultiScan(TInfo tinfo, long scanID) {}
 +    
 +    @Override
 +    public void closeScan(TInfo tinfo, long scanID) {}
 +    
 +    @Override
 +    public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) {
 +      return null;
 +    }
 +    
 +    @Override
 +    public ScanResult continueScan(TInfo tinfo, long scanID) {
 +      return null;
 +    }
 +    
 +    @Override
 +    public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent extent, ByteBuffer splitPoint) {
 +      
 +    }
 +    
 +    @Override
 +    public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> batch, List<TColumn> columns,
 +        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) {
 +      return null;
 +    }
 +    
 +    @Override
 +    public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent extent, TRange range, List<TColumn> columns, int batchSize,
 +        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) {
 +      return null;
 +    }
 +    
 +    @Override
 +    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent keyExtent, TMutation mutation) {
 +      
 +    }
 +    
 +    @Override
 +    public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      return null;
 +    }
 +    
 +    @Override
 +    public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
 +      return null;
 +    }
 +    
 +    @Override
 +    public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      return null;
 +    }
 +    
 +    @Override
 +    public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException, TException {}
 +    
 +    @Override
 +    public void fastHalt(TInfo tinfo, TCredentials credentials, String lock) {}
 +    
 +    @Override
 +    public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {}
 +    
 +    @Override
 +    public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent, boolean save) throws TException {}
 +    
 +    @Override
 +    public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      return new ArrayList<ActiveScan>();
 +    }
 +    
 +    @Override
 +    public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {}
 +    
 +    @Override
 +    public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {
 +      
 +    }
 +    
 +    @Override
 +    public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
 +      
 +    }
 +    
 +    @Override
 +    public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
 +      
 +    }
 +    
-     /*
-      * (non-Javadoc)
-      * 
-      * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.trace.thrift.TInfo,
-      * org.apache.accumulo.core.security.thrift.Credentials, java.util.List)
-      */
 +    @Override
 +    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
 +    }
 +    
 +    @Override
 +    public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      return new ArrayList<ActiveCompaction>();
 +    }
 +  }
 +  
 +  static class Opts extends Help {
 +    @Parameter(names={"-i", "--instance"}, description="instance name", required=true)
 +    String iname = null;
 +    @Parameter(names={"-z", "--keepers"}, description="comma-separated list of zookeeper host:ports", required=true)
 +    String keepers = null;
 +    @Parameter(names="--table", description="table to adopt", required=true)
 +    String tableName = null;
 +    @Parameter(names="--port", description="port number to use")
 +    int port = DefaultConfiguration.getInstance().getPort(Property.TSERV_CLIENTPORT);
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(NullTserver.class.getName(), args);
 +    
 +    TransactionWatcher watcher = new TransactionWatcher();
 +    ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher);
 +    Processor<Iface> processor = new Processor<Iface>(tch);
 +    TServerUtils.startTServer(opts.port, processor, "NullTServer", "null tserver", 2, 1000, 10*1024*1024);
 +    
 +    InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), opts.port);
 +    
 +    // modify !METADATA
 +    ZooKeeperInstance zki = new ZooKeeperInstance(opts.iname, opts.keepers);
 +    String tableId = Tables.getTableId(zki, opts.tableName);
 +    
 +    // read the locations for the table
 +    Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
 +    MetaDataTableScanner s = new MetaDataTableScanner(zki, SecurityConstants.getSystemCredentials(), tableRange);
 +    long randomSessionID = opts.port;
 +    TServerInstance instance = new TServerInstance(addr, randomSessionID);
 +    List<Assignment> assignments = new ArrayList<Assignment>();
 +    while (s.hasNext()) {
 +      TabletLocationState next = s.next();
 +      assignments.add(new Assignment(next.extent, instance));
 +    }
 +    s.close();
 +    // point them to this server
 +    MetaDataStateStore store = new MetaDataStateStore();
 +    store.setLocations(assignments);
 +    
 +    while (true) {
 +      UtilWaitThread.sleep(10000);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java
index 9d01929,0000000..7cb58c9
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java
@@@ -1,129 -1,0 +1,126 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.randomwalk;
 +
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.util.HashMap;
 +import java.util.Properties;
 +
 +import org.apache.log4j.Logger;
 +import org.apache.log4j.xml.DOMConfigurator;
 +
 +import com.beust.jcommander.Parameter;
 +
 +public class Framework {
 +  
 +  private static final Logger log = Logger.getLogger(Framework.class);
 +  private HashMap<String,Node> nodes = new HashMap<String,Node>();
 +  private String configDir = null;
 +  private static final Framework INSTANCE = new Framework();
 +  
 +  /**
 +   * @return Singleton instance of Framework
 +   */
 +  public static Framework getInstance() {
 +    return INSTANCE;
 +  }
 +  
 +  public String getConfigDir() {
 +    return configDir;
 +  }
 +  
 +  public void setConfigDir(String confDir) {
 +    configDir = confDir;
 +  }
 +  
 +  /**
 +   * Run random walk framework
 +   * 
 +   * @param startName
 +   *          Full name of starting graph or test
-    * @param state
-    * @param confDir
 +   */
 +  public int run(String startName, State state, String confDir) {
 +    
 +    try {
 +      System.out.println("confDir " + confDir);
 +      setConfigDir(confDir);
 +      Node node = getNode(startName);
 +      node.visit(state, new Properties());
 +    } catch (Exception e) {
 +      log.error("Error during random walk", e);
 +      return -1;
 +    }
 +    return 0;
 +  }
 +  
 +  /**
 +   * Creates node (if it does not already exist) and inserts into map
 +   * 
 +   * @param id
 +   *          Name of node
 +   * @return Node specified by id
-    * @throws Exception
 +   */
 +  public Node getNode(String id) throws Exception {
 +    
 +    // check for node in nodes
 +    if (nodes.containsKey(id)) {
 +      return nodes.get(id);
 +    }
 +    
 +    // otherwise create and put in nodes
 +    Node node = null;
 +    if (id.endsWith(".xml")) {
 +      node = new Module(new File(configDir + "modules/" + id));
 +    } else {
 +      node = (Test) Class.forName(id).newInstance();
 +    }
 +    nodes.put(id, node);
 +    return node;
 +  }
 +  
 +  static class Opts extends org.apache.accumulo.core.cli.Help {
 +    @Parameter(names="--configDir", required=true, description="directory containing the test configuration")
 +    String configDir;
 +    @Parameter(names="--logDir", required=true, description="location of the local logging directory")
 +    String localLogPath;
 +    @Parameter(names="--logId", required=true, description="a unique log identifier (like a hostname, or pid)")
 +    String logId;
 +    @Parameter(names="--module", required=true, description="the name of the module to run")
 +    String module;
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(Framework.class.getName(), args);
 +
 +    Properties props = new Properties();
 +    FileInputStream fis = new FileInputStream(opts.configDir + "/randomwalk.conf");
 +    props.load(fis);
 +    fis.close();
 +    
 +    System.setProperty("localLog", opts.localLogPath + "/" + opts.logId);
 +    System.setProperty("nfsLog", props.getProperty("NFS_LOGPATH") + "/" + opts.logId);
 +    
 +    DOMConfigurator.configure(opts.configDir + "logger.xml");
 +    
 +    State state = new State(props);
 +    int retval = getInstance().run(opts.module, state, opts.configDir);
 +    
 +    System.exit(retval);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java
index 1868ade,0000000..b74b6cd
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java
@@@ -1,64 -1,0 +1,63 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.randomwalk;
 +
 +import java.util.Properties;
 +
 +import org.apache.log4j.Logger;
 +
 +/**
 + * Represents a point in graph of RandomFramework
 + */
 +public abstract class Node {
 +  
 +  protected final Logger log = Logger.getLogger(this.getClass());
 +  long progress = System.currentTimeMillis();
 +  
 +  /**
 +   * Visits node
 +   * 
 +   * @param state
 +   *          Random walk state passed between nodes
-    * @throws Exception
 +   */
 +  public abstract void visit(State state, Properties props) throws Exception;
 +  
 +  @Override
 +  public boolean equals(Object o) {
 +    if (o == null)
 +      return false;
 +    return toString().equals(o.toString());
 +  }
 +  
 +  @Override
 +  public String toString() {
 +    return this.getClass().getName();
 +  }
 +  
 +  @Override
 +  public int hashCode() {
 +    return toString().hashCode();
 +  }
 +  
 +  synchronized public void makingProgress() {
 +    progress = System.currentTimeMillis();
 +  }
 +  
 +  synchronized public long lastProgress() {
 +    return progress;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
index a0dd37c,0000000..4581b04
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
@@@ -1,110 -1,0 +1,107 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.randomwalk.concurrent;
 +
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Properties;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.test.randomwalk.State;
 +import org.apache.accumulo.test.randomwalk.Test;
 +
 +/**
 + * 
 + */
 +public class CheckBalance extends Test {
 +  
 +  static final String LAST_UNBALANCED_TIME = "lastUnbalancedTime";
 +  static final String UNBALANCED_COUNT = "unbalancedCount";
 +
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.test.randomwalk.Node#visit(org.apache.accumulo.test.randomwalk.State, java.util.Properties)
-    */
 +  @Override
 +  public void visit(State state, Properties props) throws Exception {
 +    log.debug("checking balance");
 +    Map<String,Long> counts = new HashMap<String,Long>();
 +    Scanner scanner = state.getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 +    scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
 +    for (Entry<Key,Value> entry : scanner) {
 +      String location = entry.getKey().getColumnQualifier().toString();
 +      Long count = counts.get(location);
 +      if (count == null)
 +        count = Long.valueOf(0);
 +      counts.put(location, count + 1);
 +    }
 +    double total = 0.;
 +    for (Long count : counts.values()) {
 +      total += count.longValue();
 +    }
 +    final double average = total / counts.size();
 +    final double sd = stddev(counts.values(), average);
 +    log.debug("average " + average + ", standard deviation " + sd);
 +
 +    // Check for balanced # of tablets on each node
 +    double maxDifference = 2.0 * sd;
 +    String unbalancedLocation = null;
 +    long lastCount = 0L;
 +    boolean balanced = true;
 +    for (Entry<String,Long> entry : counts.entrySet()) {
 +      long thisCount = entry.getValue().longValue();
 +      if (Math.abs(thisCount - average) > maxDifference) {
 +        balanced = false;
 +        log.debug("unbalanced: " + entry.getKey() + " has " + entry.getValue() + " tablets and the average is " + average);
 +        unbalancedLocation = entry.getKey();
 +        lastCount = thisCount;
 +      }
 +    }
 +    
 +    // It is expected that the number of tablets will be uneven for short
 +    // periods of time. Don't complain unless we've seen it only unbalanced
 +    // over a 15 minute period and it's been at least three checks.
 +    if (!balanced) {
 +      Long last = state.getLong(LAST_UNBALANCED_TIME);
 +      if (last != null && System.currentTimeMillis() - last > 15 * 60 * 1000) {
 +        Integer count = state.getInteger(UNBALANCED_COUNT);
 +        if (count == null)
 +          count = Integer.valueOf(0);
 +        if (count > 3)
 +          throw new Exception("servers are unbalanced! location " + unbalancedLocation + " count " + lastCount + " too far from average " + average);
 +        count++;
 +        state.set(UNBALANCED_COUNT, count);
 +      }
 +      if (last == null)
 +        state.set(LAST_UNBALANCED_TIME, System.currentTimeMillis());
 +    } else {
 +      state.remove(LAST_UNBALANCED_TIME);
 +      state.remove(UNBALANCED_COUNT);
 +    }
 +  }
 +  
 +  private static double stddev(Collection<Long> samples, double avg) {
 +    int num = samples.size();
 +    double sqrtotal = 0.0;
 +    for (Long s : samples) {
 +      double diff = s.doubleValue() - avg;
 +      sqrtotal += diff * diff;
 +    }
-     return Math.sqrt(sqrtotal / (double) num);
++    return Math.sqrt(sqrtotal / num);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
----------------------------------------------------------------------
diff --cc trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
index 049b2a2,0000000..dfa9f0c
mode 100644,000000..100644
--- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
+++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
@@@ -1,132 -1,0 +1,122 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.trace.instrument.receivers;
 +
 +import java.io.IOException;
 +import java.nio.charset.Charset;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.ZooKeeper.States;
 +
 +/**
 + * Find a Span collector via zookeeper and push spans there via Thrift RPC
 + * 
 + */
 +public class ZooSpanClient extends SendSpansViaThrift {
 +  
 +  private static final Logger log = Logger.getLogger(ZooSpanClient.class);
 +  private static final int TOTAL_TIME_WAIT_CONNECT_MS = 10 * 1000;
 +  private static final int TIME_WAIT_CONNECT_CHECK_MS = 100;
 +  private static final Charset UTF8 = Charset.forName("UTF-8");
 +  
 +  ZooKeeper zoo = null;
 +  final String path;
 +  final Random random = new Random();
 +  final List<String> hosts = new ArrayList<String>();
 +  
 +  public ZooSpanClient(String keepers, final String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException {
 +    super(host, service, millis);
 +    this.path = path;
 +    zoo = new ZooKeeper(keepers, 30 * 1000, new Watcher() {
 +      @Override
 +      public void process(WatchedEvent event) {
 +        try {
 +          if (zoo != null) {
 +            updateHosts(path, zoo.getChildren(path, null));
 +          }
 +        } catch (Exception ex) {
 +          log.error("unable to get destination hosts in zookeeper", ex);
 +        }
 +      }
 +    });
 +    for (int i = 0; i < TOTAL_TIME_WAIT_CONNECT_MS; i += TIME_WAIT_CONNECT_CHECK_MS) {
 +      if (zoo.getState().equals(States.CONNECTED))
 +        break;
 +      try {
 +        Thread.sleep(TIME_WAIT_CONNECT_CHECK_MS);
 +      } catch (InterruptedException ex) {
 +        break;
 +      }
 +    }
 +    zoo.getChildren(path, true);
 +  }
 +  
-   /*
-    * (non-Javadoc)
-    * 
-    * @see trace.instrument.receivers.AsyncSpanReceiver#flush()
-    */
 +  @Override
 +  public void flush() {
 +    if (!hosts.isEmpty())
 +      super.flush();
 +  }
 +  
-   /*
-    * (non-Javadoc)
-    * 
-    * @see trace.instrument.receivers.AsyncSpanReceiver#sendSpans()
-    */
 +  @Override
 +  void sendSpans() {
 +    if (hosts.isEmpty()) {
 +      if (!sendQueue.isEmpty()) {
 +        log.error("No hosts to send data to, dropping queued spans");
 +        synchronized (sendQueue) {
 +          sendQueue.clear();
 +          sendQueue.notifyAll();
 +        }
 +      }
 +    } else {
 +      super.sendSpans();
 +    }
 +  }
 +
 +  synchronized private void updateHosts(String path, List<String> children) {
 +    log.debug("Scanning trace hosts in zookeeper: " + path);
 +    try {
 +      List<String> hosts = new ArrayList<String>();
 +      for (String child : children) {
 +        byte[] data = zoo.getData(path + "/" + child, null, null);
 +        hosts.add(new String(data, UTF8));
 +      }
 +      this.hosts.clear();
 +      this.hosts.addAll(hosts);
 +      log.debug("Trace hosts: " + this.hosts);
 +    } catch (Exception ex) {
 +      log.error("unable to get destination hosts in zookeeper", ex);
 +    }
 +  }
 +  
 +  @Override
 +  synchronized protected String getSpanKey(Map<String,String> data) {
 +    if (hosts.size() > 0) {
 +      String host = hosts.get(random.nextInt(hosts.size()));
 +      log.debug("sending data to " + host);
 +      return host;
 +    }
 +    return null;
 +  }
 +}