You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:57:49 UTC
[18/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
index 06b9a7c,0000000..70156b2
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
@@@ -1,223 -1,0 +1,222 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.util.reflection.CounterUtils;
+import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+/**
+ * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
+ */
+
+public class ContinuousVerify extends Configured implements Tool {
+ public static final VLongWritable DEF = new VLongWritable(-1);
+
+ public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
+
+ private LongWritable row = new LongWritable();
+ private LongWritable ref = new LongWritable();
+ private VLongWritable vrow = new VLongWritable();
+
+ private long corrupt = 0;
+
+ @Override
+ public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
+ long r = Long.parseLong(key.getRow().toString(), 16);
+ if (r < 0)
+ throw new IllegalArgumentException();
+
+ try {
+ ContinuousWalk.validate(key, data);
+ } catch (BadChecksumException bce) {
+ CounterUtils.increment(context.getCounter(Counts.CORRUPT));
+ if (corrupt < 1000) {
+ System.out.println("ERROR Bad checksum : " + key);
+ } else if (corrupt == 1000) {
+ System.out.println("Too many bad checksums, not printing anymore!");
+ }
+ corrupt++;
+ return;
+ }
+
+ row.set(r);
+
+ context.write(row, DEF);
+ byte[] val = data.get();
+
+ int offset = ContinuousWalk.getPrevRowOffset(val);
+ if (offset > 0) {
+ ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16));
+ vrow.set(r);
+ context.write(ref, vrow);
+ }
+ }
+ }
+
+ public static enum Counts {
+ UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
+ }
+
+ public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
+ private ArrayList<Long> refs = new ArrayList<Long>();
+
+ @Override
+ public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
+
+ int defCount = 0;
+
+ refs.clear();
+ for (VLongWritable type : values) {
+ if (type.get() == -1) {
+ defCount++;
+ } else {
+ refs.add(type.get());
+ }
+ }
+
+ if (defCount == 0 && refs.size() > 0) {
+ StringBuilder sb = new StringBuilder();
+ String comma = "";
+ for (Long ref : refs) {
+ sb.append(comma);
+ comma = ",";
+ sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8));
+ }
+
+ context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
+ CounterUtils.increment(context.getCounter(Counts.UNDEFINED));
+
+ } else if (defCount > 0 && refs.size() == 0) {
+ CounterUtils.increment(context.getCounter(Counts.UNREFERENCED));
+ } else {
+ CounterUtils.increment(context.getCounter(Counts.REFERENCED));
+ }
+
+ }
+ }
+
+ static class Opts extends ClientOnDefaultTable {
+ @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true)
+ String outputDir = "/tmp/continuousVerify";
+
+ @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
+ int maxMaps = 0;
+
+ @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class)
+ int reducers = 0;
+
+ @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
+ boolean scanOffline = false;
+
+ public Opts() {
+ super("ci");
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(this.getClass().getName(), args);
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ opts.setAccumuloConfigs(job);
+
+ Set<Range> ranges = null;
+ String clone = opts.getTableName();
+ Connector conn = null;
+
+ if (opts.scanOffline) {
+ Random random = new Random();
+ clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
+ conn = opts.getConnector();
+ conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
+ ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+ conn.tableOperations().offline(clone);
+ AccumuloInputFormat.setInputTableName(job, clone);
+ AccumuloInputFormat.setOfflineTableScan(job, true);
+ } else {
+ ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+ }
+
+ AccumuloInputFormat.setRanges(job, ranges);
+ AccumuloInputFormat.setAutoAdjustRanges(job, false);
+
+ job.setMapperClass(CMapper.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(VLongWritable.class);
+
+ job.setReducerClass(CReducer.class);
+ job.setNumReduceTasks(opts.reducers);
+
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
+
+ TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
+
+ job.waitForCompletion(true);
+
+ if (opts.scanOffline) {
+ conn.tableOperations().delete(clone);
+ }
+ opts.stopTracing();
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ /**
+ *
+ * @param args
+ * instanceName zookeepers username password table columns outputpath
- * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
+ if (res != 0)
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
index c522914,0000000..f1dfcd2
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
@@@ -1,51 -1,0 +1,48 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.File;
+import java.util.Arrays;
+
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+public class CacheTestClean {
+
- /**
- * @param args
- */
+ public static void main(String[] args) throws Exception {
+ String rootDir = args[0];
+ File reportDir = new File(args[1]);
+
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+
+ if (zoo.exists(rootDir)) {
+ zoo.recursiveDelete(rootDir, NodeMissingPolicy.FAIL);
+ }
+
+ if (!reportDir.exists()) {
+ reportDir.mkdir();
+ } else {
+ File[] files = reportDir.listFiles();
+ if (files.length != 0)
+ throw new Exception("dir " + reportDir + " is not empty: " + Arrays.asList(files));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
index 2b775c5,0000000..06c6fdb
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
@@@ -1,217 -1,0 +1,213 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.server.util.reflection.CounterUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Runs the functional tests via map-reduce.
+ *
+ * First, be sure everything is compiled.
+ *
+ * Second, get a list of the tests you want to run:
+ *
+ * <pre>
+ * $ python test/system/auto/run.py -l > tests
+ * </pre>
+ *
+ * Put the list of tests into HDFS:
+ *
+ * <pre>
+ * $ hadoop fs -put tests /user/hadoop/tests
+ * </pre>
+ *
+ * Run the map-reduce job:
+ *
+ * <pre>
+ * $ ./bin/accumulo accumulo.test.functional.RunTests --tests /user/hadoop/tests --output /user/hadoop/results
+ * </pre>
+ *
+ * Note that you will need to have some configuration in conf/accumulo-site.xml (to locate zookeeper). The map-reduce jobs will not use your local accumulo
+ * instance.
+ *
+ */
+public class RunTests extends Configured implements Tool {
+
+ static final public String JOB_NAME = "Functional Test Runner";
+ private static final Logger log = Logger.getLogger(RunTests.class);
+
+ private Job job = null;
+
+ private static final int DEFAULT_TIMEOUT_FACTOR = 1;
+
+ static class Opts extends Help {
+ @Parameter(names="--tests", description="newline separated list of tests to run", required=true)
+ String testFile;
+ @Parameter(names="--output", description="destination for the results of tests in HDFS", required=true)
+ String outputPath;
+ @Parameter(names="--timeoutFactor", description="Optional scaling factor for timeout for both mapred.task.timeout and -f flag on run.py", required=false)
+ Integer intTimeoutFactor = DEFAULT_TIMEOUT_FACTOR;
+ }
+
+ static final String TIMEOUT_FACTOR = RunTests.class.getName() + ".timeoutFactor";
+
+ static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
+
+ private static final String REDUCER_RESULT_START = "::::: ";
+ private static final int RRS_LEN = REDUCER_RESULT_START.length();
+ private Text result = new Text();
+ String mapperTimeoutFactor = null;
+
+ private static enum Outcome {
+ SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE
+ }
+ private static final Map<Character, Outcome> OUTCOME_COUNTERS;
+ static {
+ OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>();
+ OUTCOME_COUNTERS.put('S', Outcome.SUCCESS);
+ OUTCOME_COUNTERS.put('F', Outcome.FAILURE);
+ OUTCOME_COUNTERS.put('E', Outcome.ERROR);
+ OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS);
+ OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE);
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-m", "-f", mapperTimeoutFactor, "-t", value.toString());
+ log.info("Running test " + cmd);
+ ProcessBuilder pb = new ProcessBuilder(cmd);
+ pb.directory(new File(context.getConfiguration().get("accumulo.home")));
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ p.getOutputStream().close();
+ InputStream out = p.getInputStream();
+ InputStreamReader outr = new InputStreamReader(out, Constants.UTF8);
+ BufferedReader br = new BufferedReader(outr);
+ String line;
+ try {
+ while ((line = br.readLine()) != null) {
+ log.info("More: " + line);
+ if (line.startsWith(REDUCER_RESULT_START)) {
+ String resultLine = line.substring(RRS_LEN);
+ if (resultLine.length() > 0) {
+ Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0));
+ if (outcome != null) {
+ CounterUtils.increment(context.getCounter(outcome));
+ }
+ }
+ String taskAttemptId = context.getTaskAttemptID().toString();
+ result.set(taskAttemptId + " " + resultLine);
+ context.write(value, result);
+ }
+ }
+ } catch (Exception ex) {
+ log.error(ex);
+ context.progress();
+ }
+
+ p.waitFor();
+ }
+
+ @Override
+ protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException {
+ mapperTimeoutFactor = Integer.toString(context.getConfiguration().getInt(TIMEOUT_FACTOR, DEFAULT_TIMEOUT_FACTOR));
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ job = new Job(getConf(), JOB_NAME);
+ job.setJarByClass(this.getClass());
+ Opts opts = new Opts();
+ opts.parseArgs(RunTests.class.getName(), args);
+
+ // this is like 1-2 tests per mapper
+ Configuration conf = job.getConfiguration();
+ conf.setInt("mapred.max.split.size", 40);
+ conf.set("accumulo.home", System.getenv("ACCUMULO_HOME"));
+
+ // Taking third argument as scaling factor to setting mapred.task.timeout
+ // and TIMEOUT_FACTOR
+ conf.setInt("mapred.task.timeout", opts.intTimeoutFactor * 8 * 60 * 1000);
+ conf.setInt(TIMEOUT_FACTOR, opts.intTimeoutFactor);
+ conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+
+ // set input
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, new Path(opts.testFile));
+
+ // set output
+ job.setOutputFormatClass(TextOutputFormat.class);
+ FileSystem fs = FileSystem.get(conf);
+ Path destination = new Path(opts.outputPath);
+ if (fs.exists(destination)) {
+ log.info("Deleting existing output directory " + opts.outputPath);
+ fs.delete(destination, true);
+ }
+ TextOutputFormat.setOutputPath(job, destination);
+
+ // configure default reducer: put the results into one file
+ job.setNumReduceTasks(1);
+
+ // set mapper
+ job.setMapperClass(TestMapper.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ // don't do anything with the results (yet) a summary would be nice
+ job.setNumReduceTasks(0);
+
+ // submit the job
+ log.info("Starting tests");
+ return 0;
+ }
+
- /**
- * @param args
- * @throws Exception
- */
+ public static void main(String[] args) throws Exception {
+ RunTests tests = new RunTests();
+ ToolRunner.run(new Configuration(), tests, args);
+ tests.job.waitForCompletion(true);
+ if (!tests.job.isSuccessful())
+ System.exit(1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
index a9b072e,0000000..85cddbb
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
@@@ -1,246 -1,0 +1,243 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.performance.metadata;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This little program can be used to write a lot of entries to the !METADATA table and measure the performance of varying numbers of threads doing !METADATA
+ * lookups using the batch scanner.
+ *
+ *
+ */
+
+public class MetadataBatchScanTest {
+
- /**
- * @param args
- */
+ public static void main(String[] args) throws Exception {
+
+ final Connector connector = new ZooKeeperInstance("acu14", "localhost")
+ .getConnector(SecurityConstants.SYSTEM_PRINCIPAL, SecurityConstants.getSystemToken());
+
+ TreeSet<Long> splits = new TreeSet<Long>();
+ Random r = new Random(42);
+
+ while (splits.size() < 99999) {
+ splits.add((r.nextLong() & 0x7fffffffffffffffl) % 1000000000000l);
+ }
+
+ Text tid = new Text("8");
+ Text per = null;
+
+ ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>();
+
+ for (Long split : splits) {
+ Text er = new Text(String.format("%012d", split));
+ KeyExtent ke = new KeyExtent(tid, er, per);
+ per = er;
+
+ extents.add(ke);
+ }
+
+ extents.add(new KeyExtent(tid, null, per));
+
+ if (args[0].equals("write")) {
+
+ BatchWriter bw = connector.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
+
+ for (KeyExtent extent : extents) {
+ Mutation mut = extent.getPrevRowUpdateMutation();
+ new TServerInstance(AddressUtil.parseAddress("192.168.1.100", 4567), "DEADBEEF").putLocation(mut);
+ bw.addMutation(mut);
+ }
+
+ bw.close();
+ } else if (args[0].equals("writeFiles")) {
+ BatchWriter bw = connector.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
+
+ for (KeyExtent extent : extents) {
+
+ Mutation mut = new Mutation(extent.getMetadataEntry());
+
+ String dir = "/t-" + UUID.randomUUID();
+
+ Constants.METADATA_DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes(Constants.UTF8)));
+
+ for (int i = 0; i < 5; i++) {
+ mut.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(Constants.UTF8)));
+ }
+
+ bw.addMutation(mut);
+ }
+
+ bw.close();
+ } else if (args[0].equals("scan")) {
+
+ int numThreads = Integer.parseInt(args[1]);
+ final int numLoop = Integer.parseInt(args[2]);
+ int numLookups = Integer.parseInt(args[3]);
+
+ HashSet<Integer> indexes = new HashSet<Integer>();
+ while (indexes.size() < numLookups) {
+ indexes.add(r.nextInt(extents.size()));
+ }
+
+ final List<Range> ranges = new ArrayList<Range>();
+ for (Integer i : indexes) {
+ ranges.add(extents.get(i).toMetadataRange());
+ }
+
+ Thread threads[] = new Thread[numThreads];
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ System.out.println(runScanTest(connector, numLoop, ranges));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ System.out.printf("tt : %6.2f%n", (t2 - t1) / 1000.0);
+
+ } else {
+ throw new IllegalArgumentException();
+ }
+
+ }
+
+ private static ScanStats runScanTest(Connector connector, int numLoop, List<Range> ranges) throws Exception {
+ Scanner scanner = null;/*
+ * connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); ColumnFQ.fetch(scanner,
+ * Constants.METADATA_LOCATION_COLUMN); ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN);
+ */
+
+ BatchScanner bs = connector.createBatchScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS, 1);
+ bs.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+ Constants.METADATA_PREV_ROW_COLUMN.fetch(bs);
+
+ bs.setRanges(ranges);
+
+ // System.out.println(ranges);
+
+ ScanStats stats = new ScanStats();
+ for (int i = 0; i < numLoop; i++) {
+ ScanStat ss = scan(bs, ranges, scanner);
+ stats.merge(ss);
+ }
+
+ return stats;
+ }
+
+ private static class ScanStat {
+ long delta1;
+ long delta2;
+ int count1;
+ int count2;
+ }
+
+ private static class ScanStats {
+ Stat delta1 = new Stat();
+ Stat delta2 = new Stat();
+ Stat count1 = new Stat();
+ Stat count2 = new Stat();
+
+ void merge(ScanStat ss) {
+ delta1.addStat(ss.delta1);
+ delta2.addStat(ss.delta2);
+ count1.addStat(ss.count1);
+ count2.addStat(ss.count2);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + delta1 + "] [" + delta2 + "]";
+ }
+ }
+
+ private static ScanStat scan(BatchScanner bs, List<Range> ranges, Scanner scanner) {
+
+ // System.out.println("ranges : "+ranges);
+
+ ScanStat ss = new ScanStat();
+
+ long t1 = System.currentTimeMillis();
+ int count = 0;
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : bs) {
+ count++;
+ }
+ long t2 = System.currentTimeMillis();
+
+ ss.delta1 = (t2 - t1);
+ ss.count1 = count;
+
+ count = 0;
+ t1 = System.currentTimeMillis();
+ /*
+ * for (Range range : ranges) { scanner.setRange(range); for (Entry<Key, Value> entry : scanner) { count++; } }
+ */
+
+ t2 = System.currentTimeMillis();
+
+ ss.delta2 = (t2 - t1);
+ ss.count2 = count;
+
+ return ss;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 71970d3,0000000..e6fcd5b
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@@ -1,258 -1,0 +1,252 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.performance.thrift;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
- import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.InitialMultiScan;
+import org.apache.accumulo.core.data.thrift.InitialScan;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
++import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+
+import com.beust.jcommander.Parameter;
+
+
+/**
+ * The purpose of this class is to server as fake tserver that is a data sink like /dev/null. NullTserver modifies the !METADATA location entries for a table to
+ * point to it. This allows thrift performance to be measured by running any client code that writes to a table.
+ *
+ */
+
+public class NullTserver {
+
+ public static class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
+
+ private long updateSession = 1;
+
+ public ThriftClientHandler(Instance instance, TransactionWatcher watcher) {
+ super(instance, watcher);
+ }
+
+ @Override
+ public long startUpdate(TInfo tinfo, TCredentials credentials) {
+ return updateSession++;
+ }
+
+ @Override
+ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent keyExtent, List<TMutation> mutation) {}
+
+ @Override
+ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) {
+ return new UpdateErrors(new HashMap<TKeyExtent,Long>(), new ArrayList<TConstraintViolationSummary>(), new HashMap<TKeyExtent, SecurityErrorCode>());
+ }
+
+ @Override
+ public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime) {
+ return null;
+ }
+
+ @Override
+ public void closeMultiScan(TInfo tinfo, long scanID) {}
+
+ @Override
+ public void closeScan(TInfo tinfo, long scanID) {}
+
+ @Override
+ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) {
+ return null;
+ }
+
+ @Override
+ public ScanResult continueScan(TInfo tinfo, long scanID) {
+ return null;
+ }
+
+ @Override
+ public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent extent, ByteBuffer splitPoint) {
+
+ }
+
+ @Override
+ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> batch, List<TColumn> columns,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) {
+ return null;
+ }
+
+ @Override
+ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent extent, TRange range, List<TColumn> columns, int batchSize,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) {
+ return null;
+ }
+
+ @Override
+ public void update(TInfo tinfo, TCredentials credentials, TKeyExtent keyExtent, TMutation mutation) {
+
+ }
+
+ @Override
+ public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return null;
+ }
+
+ @Override
+ public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
+ return null;
+ }
+
+ @Override
+ public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return null;
+ }
+
+ @Override
+ public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException, TException {}
+
+ @Override
+ public void fastHalt(TInfo tinfo, TCredentials credentials, String lock) {}
+
+ @Override
+ public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {}
+
+ @Override
+ public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent, boolean save) throws TException {}
+
+ @Override
+ public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return new ArrayList<ActiveScan>();
+ }
+
+ @Override
+ public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {}
+
+ @Override
+ public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {
+
+ }
+
+ @Override
+ public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
+
+ }
+
+ @Override
+ public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
+
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.trace.thrift.TInfo,
- * org.apache.accumulo.core.security.thrift.Credentials, java.util.List)
- */
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
+ }
+
+ @Override
+ public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return new ArrayList<ActiveCompaction>();
+ }
+ }
+
+ static class Opts extends Help {
+ @Parameter(names={"-i", "--instance"}, description="instance name", required=true)
+ String iname = null;
+ @Parameter(names={"-z", "--keepers"}, description="comma-separated list of zookeeper host:ports", required=true)
+ String keepers = null;
+ @Parameter(names="--table", description="table to adopt", required=true)
+ String tableName = null;
+ @Parameter(names="--port", description="port number to use")
+ int port = DefaultConfiguration.getInstance().getPort(Property.TSERV_CLIENTPORT);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(NullTserver.class.getName(), args);
+
+ TransactionWatcher watcher = new TransactionWatcher();
+ ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher);
+ Processor<Iface> processor = new Processor<Iface>(tch);
+ TServerUtils.startTServer(opts.port, processor, "NullTServer", "null tserver", 2, 1000, 10*1024*1024);
+
+ InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), opts.port);
+
+ // modify !METADATA
+ ZooKeeperInstance zki = new ZooKeeperInstance(opts.iname, opts.keepers);
+ String tableId = Tables.getTableId(zki, opts.tableName);
+
+ // read the locations for the table
+ Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
+ MetaDataTableScanner s = new MetaDataTableScanner(zki, SecurityConstants.getSystemCredentials(), tableRange);
+ long randomSessionID = opts.port;
+ TServerInstance instance = new TServerInstance(addr, randomSessionID);
+ List<Assignment> assignments = new ArrayList<Assignment>();
+ while (s.hasNext()) {
+ TabletLocationState next = s.next();
+ assignments.add(new Assignment(next.extent, instance));
+ }
+ s.close();
+ // point them to this server
+ MetaDataStateStore store = new MetaDataStateStore();
+ store.setLocations(assignments);
+
+ while (true) {
+ UtilWaitThread.sleep(10000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java
index 9d01929,0000000..7cb58c9
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java
@@@ -1,129 -1,0 +1,126 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+
+import com.beust.jcommander.Parameter;
+
+public class Framework {
+
+ private static final Logger log = Logger.getLogger(Framework.class);
+ private HashMap<String,Node> nodes = new HashMap<String,Node>();
+ private String configDir = null;
+ private static final Framework INSTANCE = new Framework();
+
+ /**
+ * @return Singleton instance of Framework
+ */
+ public static Framework getInstance() {
+ return INSTANCE;
+ }
+
+ public String getConfigDir() {
+ return configDir;
+ }
+
+ public void setConfigDir(String confDir) {
+ configDir = confDir;
+ }
+
+ /**
+ * Run random walk framework
+ *
+ * @param startName
+ * Full name of starting graph or test
- * @param state
- * @param confDir
+ */
+ public int run(String startName, State state, String confDir) {
+
+ try {
+ System.out.println("confDir " + confDir);
+ setConfigDir(confDir);
+ Node node = getNode(startName);
+ node.visit(state, new Properties());
+ } catch (Exception e) {
+ log.error("Error during random walk", e);
+ return -1;
+ }
+ return 0;
+ }
+
+ /**
+ * Creates node (if it does not already exist) and inserts into map
+ *
+ * @param id
+ * Name of node
+ * @return Node specified by id
- * @throws Exception
+ */
+ public Node getNode(String id) throws Exception {
+
+ // check for node in nodes
+ if (nodes.containsKey(id)) {
+ return nodes.get(id);
+ }
+
+ // otherwise create and put in nodes
+ Node node = null;
+ if (id.endsWith(".xml")) {
+ node = new Module(new File(configDir + "modules/" + id));
+ } else {
+ node = (Test) Class.forName(id).newInstance();
+ }
+ nodes.put(id, node);
+ return node;
+ }
+
+ static class Opts extends org.apache.accumulo.core.cli.Help {
+ @Parameter(names="--configDir", required=true, description="directory containing the test configuration")
+ String configDir;
+ @Parameter(names="--logDir", required=true, description="location of the local logging directory")
+ String localLogPath;
+ @Parameter(names="--logId", required=true, description="a unique log identifier (like a hostname, or pid)")
+ String logId;
+ @Parameter(names="--module", required=true, description="the name of the module to run")
+ String module;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(Framework.class.getName(), args);
+
+ Properties props = new Properties();
+ FileInputStream fis = new FileInputStream(opts.configDir + "/randomwalk.conf");
+ props.load(fis);
+ fis.close();
+
+ System.setProperty("localLog", opts.localLogPath + "/" + opts.logId);
+ System.setProperty("nfsLog", props.getProperty("NFS_LOGPATH") + "/" + opts.logId);
+
+ DOMConfigurator.configure(opts.configDir + "logger.xml");
+
+ State state = new State(props);
+ int retval = getInstance().run(opts.module, state, opts.configDir);
+
+ System.exit(retval);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java
index 1868ade,0000000..b74b6cd
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java
@@@ -1,64 -1,0 +1,63 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk;
+
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Represents a point in graph of RandomFramework
+ */
+public abstract class Node {
+
+ protected final Logger log = Logger.getLogger(this.getClass());
+ long progress = System.currentTimeMillis();
+
+ /**
+ * Visits node
+ *
+ * @param state
+ * Random walk state passed between nodes
- * @throws Exception
+ */
+ public abstract void visit(State state, Properties props) throws Exception;
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null)
+ return false;
+ return toString().equals(o.toString());
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ synchronized public void makingProgress() {
+ progress = System.currentTimeMillis();
+ }
+
+ synchronized public long lastProgress() {
+ return progress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
index a0dd37c,0000000..4581b04
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
@@@ -1,110 -1,0 +1,107 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.concurrent;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.test.randomwalk.State;
+import org.apache.accumulo.test.randomwalk.Test;
+
+/**
+ *
+ */
+public class CheckBalance extends Test {
+
+ static final String LAST_UNBALANCED_TIME = "lastUnbalancedTime";
+ static final String UNBALANCED_COUNT = "unbalancedCount";
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.test.randomwalk.Node#visit(org.apache.accumulo.test.randomwalk.State, java.util.Properties)
- */
+ @Override
+ public void visit(State state, Properties props) throws Exception {
+ log.debug("checking balance");
+ Map<String,Long> counts = new HashMap<String,Long>();
+ Scanner scanner = state.getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+ for (Entry<Key,Value> entry : scanner) {
+ String location = entry.getKey().getColumnQualifier().toString();
+ Long count = counts.get(location);
+ if (count == null)
+ count = Long.valueOf(0);
+ counts.put(location, count + 1);
+ }
+ double total = 0.;
+ for (Long count : counts.values()) {
+ total += count.longValue();
+ }
+ final double average = total / counts.size();
+ final double sd = stddev(counts.values(), average);
+ log.debug("average " + average + ", standard deviation " + sd);
+
+ // Check for balanced # of tablets on each node
+ double maxDifference = 2.0 * sd;
+ String unbalancedLocation = null;
+ long lastCount = 0L;
+ boolean balanced = true;
+ for (Entry<String,Long> entry : counts.entrySet()) {
+ long thisCount = entry.getValue().longValue();
+ if (Math.abs(thisCount - average) > maxDifference) {
+ balanced = false;
+ log.debug("unbalanced: " + entry.getKey() + " has " + entry.getValue() + " tablets and the average is " + average);
+ unbalancedLocation = entry.getKey();
+ lastCount = thisCount;
+ }
+ }
+
+ // It is expected that the number of tablets will be uneven for short
+ // periods of time. Don't complain unless we've seen it only unbalanced
+ // over a 15 minute period and it's been at least three checks.
+ if (!balanced) {
+ Long last = state.getLong(LAST_UNBALANCED_TIME);
+ if (last != null && System.currentTimeMillis() - last > 15 * 60 * 1000) {
+ Integer count = state.getInteger(UNBALANCED_COUNT);
+ if (count == null)
+ count = Integer.valueOf(0);
+ if (count > 3)
+ throw new Exception("servers are unbalanced! location " + unbalancedLocation + " count " + lastCount + " too far from average " + average);
+ count++;
+ state.set(UNBALANCED_COUNT, count);
+ }
+ if (last == null)
+ state.set(LAST_UNBALANCED_TIME, System.currentTimeMillis());
+ } else {
+ state.remove(LAST_UNBALANCED_TIME);
+ state.remove(UNBALANCED_COUNT);
+ }
+ }
+
+ private static double stddev(Collection<Long> samples, double avg) {
+ int num = samples.size();
+ double sqrtotal = 0.0;
+ for (Long s : samples) {
+ double diff = s.doubleValue() - avg;
+ sqrtotal += diff * diff;
+ }
- return Math.sqrt(sqrtotal / (double) num);
++ return Math.sqrt(sqrtotal / num);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
----------------------------------------------------------------------
diff --cc trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
index 049b2a2,0000000..dfa9f0c
mode 100644,000000..100644
--- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
+++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
@@@ -1,132 -1,0 +1,122 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.trace.instrument.receivers;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
+/**
+ * Find a Span collector via zookeeper and push spans there via Thrift RPC
+ *
+ */
+public class ZooSpanClient extends SendSpansViaThrift {
+
+ private static final Logger log = Logger.getLogger(ZooSpanClient.class);
+ private static final int TOTAL_TIME_WAIT_CONNECT_MS = 10 * 1000;
+ private static final int TIME_WAIT_CONNECT_CHECK_MS = 100;
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+
+ ZooKeeper zoo = null;
+ final String path;
+ final Random random = new Random();
+ final List<String> hosts = new ArrayList<String>();
+
+ public ZooSpanClient(String keepers, final String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException {
+ super(host, service, millis);
+ this.path = path;
+ zoo = new ZooKeeper(keepers, 30 * 1000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ if (zoo != null) {
+ updateHosts(path, zoo.getChildren(path, null));
+ }
+ } catch (Exception ex) {
+ log.error("unable to get destination hosts in zookeeper", ex);
+ }
+ }
+ });
+ for (int i = 0; i < TOTAL_TIME_WAIT_CONNECT_MS; i += TIME_WAIT_CONNECT_CHECK_MS) {
+ if (zoo.getState().equals(States.CONNECTED))
+ break;
+ try {
+ Thread.sleep(TIME_WAIT_CONNECT_CHECK_MS);
+ } catch (InterruptedException ex) {
+ break;
+ }
+ }
+ zoo.getChildren(path, true);
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see trace.instrument.receivers.AsyncSpanReceiver#flush()
- */
+ @Override
+ public void flush() {
+ if (!hosts.isEmpty())
+ super.flush();
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see trace.instrument.receivers.AsyncSpanReceiver#sendSpans()
- */
+ @Override
+ void sendSpans() {
+ if (hosts.isEmpty()) {
+ if (!sendQueue.isEmpty()) {
+ log.error("No hosts to send data to, dropping queued spans");
+ synchronized (sendQueue) {
+ sendQueue.clear();
+ sendQueue.notifyAll();
+ }
+ }
+ } else {
+ super.sendSpans();
+ }
+ }
+
+ synchronized private void updateHosts(String path, List<String> children) {
+ log.debug("Scanning trace hosts in zookeeper: " + path);
+ try {
+ List<String> hosts = new ArrayList<String>();
+ for (String child : children) {
+ byte[] data = zoo.getData(path + "/" + child, null, null);
+ hosts.add(new String(data, UTF8));
+ }
+ this.hosts.clear();
+ this.hosts.addAll(hosts);
+ log.debug("Trace hosts: " + this.hosts);
+ } catch (Exception ex) {
+ log.error("unable to get destination hosts in zookeeper", ex);
+ }
+ }
+
+ @Override
+ synchronized protected String getSpanKey(Map<String,String> data) {
+ if (hosts.size() > 0) {
+ String host = hosts.get(random.nextInt(hosts.size()));
+ log.debug("sending data to " + host);
+ return host;
+ }
+ return null;
+ }
+}