You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by md...@apache.org on 2014/03/04 20:24:40 UTC

[01/10] git commit: ACCUMULO-2233 Get ranges from cloned table

Repository: accumulo
Updated Branches:
  refs/heads/1.4.5-SNAPSHOT 759582b78 -> 4bdebdb1e
  refs/heads/1.5.2-SNAPSHOT bd283aec0 -> 5d9e1557f
  refs/heads/1.6.0-SNAPSHOT 2f7b9d306 -> 7f6c21310
  refs/heads/master efd6261be -> fa34e0428


ACCUMULO-2233 Get ranges from cloned table


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4bdebdb1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4bdebdb1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4bdebdb1

Branch: refs/heads/1.4.5-SNAPSHOT
Commit: 4bdebdb1e5aaec75c678d91ff6b7d53ff65e3ec0
Parents: 759582b
Author: Mike Drob <md...@cloudera.com>
Authored: Mon Mar 3 12:52:08 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 09:19:40 2014 -0500

----------------------------------------------------------------------
 .../server/test/continuous/ContinuousVerify.java  | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4bdebdb1/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
index 6165d2a..9441cf5 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
@@ -168,15 +168,21 @@ public class ContinuousVerify extends Configured implements Tool {
     Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
     job.setJarByClass(this.getClass());
 
+    Set<Range> ranges = null;
     String clone = table;
     Connector conn = null;
+
     if (scanOffline) {
       Random random = new Random();
       clone = table + "_" + String.format("%016x", Math.abs(random.nextLong()));
       ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
       conn = zki.getConnector(user, pass.getBytes());
       conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(), new HashSet<String>());
+      ranges = conn.tableOperations().splitRangeByTablets(clone, new Range(), Integer.parseInt(maxMaps));
       conn.tableOperations().offline(clone);
+    } else {
+      ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass).tableOperations()
+          .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
     }
 
     job.setInputFormatClass(AccumuloInputFormat.class);
@@ -189,16 +195,8 @@ public class ContinuousVerify extends Configured implements Tool {
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, authorizations);
     AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
     AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
-
-    // set up ranges
-    try {
-      Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
-          .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
-      AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
-      AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
+    AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
 
     job.setMapperClass(CMapper.class);
     job.setMapOutputKeyClass(LongWritable.class);


[09/10] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT

Posted by md...@apache.org.
Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7f6c2131
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7f6c2131
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7f6c2131

Branch: refs/heads/master
Commit: 7f6c213108538b0a7aaf79e349dbe4551f8afae2
Parents: 2f7b9d3 5d9e155
Author: Mike Drob <md...@cloudera.com>
Authored: Tue Mar 4 14:24:07 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 14:24:07 2014 -0500

----------------------------------------------------------------------
 .../accumulo/test/continuous/ContinuousVerify.java   | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6c2131/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------


[07/10] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT

Posted by md...@apache.org.
Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5d9e1557
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5d9e1557
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5d9e1557

Branch: refs/heads/master
Commit: 5d9e1557f822ab22096dd39c4820720256c80187
Parents: bd283ae 4bdebdb
Author: Mike Drob <md...@cloudera.com>
Authored: Tue Mar 4 13:43:57 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 13:43:57 2014 -0500

----------------------------------------------------------------------
 .../accumulo/test/continuous/ContinuousVerify.java   | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d9e1557/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
index a5a6a2b,0000000..07e0c92
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
@@@ -1,244 -1,0 +1,243 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.IOException;
 +import java.lang.reflect.Method;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.ClientOnDefaultTable;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.VLongWritable;
 +import org.apache.hadoop.mapred.Counters.Counter;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +/**
 + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
 + */
 +
 +public class ContinuousVerify extends Configured implements Tool {
 +
 +  // work around hadoop-1/hadoop-2 runtime incompatibility
 +  static private Method INCREMENT;
 +  static {
 +    try {
 +      INCREMENT = Counter.class.getMethod("increment", Long.TYPE);
 +    } catch (Exception ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +
 +  static void increment(Object obj) {
 +    try {
 +      INCREMENT.invoke(obj, 1L);
 +    } catch (Exception ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +
 +  public static final VLongWritable DEF = new VLongWritable(-1);
 +
 +  public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
 +
 +    private LongWritable row = new LongWritable();
 +    private LongWritable ref = new LongWritable();
 +    private VLongWritable vrow = new VLongWritable();
 +
 +    private long corrupt = 0;
 +
 +    @Override
 +    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
 +      long r = Long.parseLong(key.getRow().toString(), 16);
 +      if (r < 0)
 +        throw new IllegalArgumentException();
 +
 +      try {
 +        ContinuousWalk.validate(key, data);
 +      } catch (BadChecksumException bce) {
 +        increment(context.getCounter(Counts.CORRUPT));
 +        if (corrupt < 1000) {
 +          System.out.println("ERROR Bad checksum : " + key);
 +        } else if (corrupt == 1000) {
 +          System.out.println("Too many bad checksums, not printing anymore!");
 +        }
 +        corrupt++;
 +        return;
 +      }
 +
 +      row.set(r);
 +
 +      context.write(row, DEF);
 +      byte[] val = data.get();
 +
 +      int offset = ContinuousWalk.getPrevRowOffset(val);
 +      if (offset > 0) {
 +        ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16));
 +        vrow.set(r);
 +        context.write(ref, vrow);
 +      }
 +    }
 +  }
 +
 +  public static enum Counts {
 +    UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
 +  }
 +
 +  public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
 +    private ArrayList<Long> refs = new ArrayList<Long>();
 +
 +    @Override
 +    public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
 +
 +      int defCount = 0;
 +
 +      refs.clear();
 +      for (VLongWritable type : values) {
 +        if (type.get() == -1) {
 +          defCount++;
 +        } else {
 +          refs.add(type.get());
 +        }
 +      }
 +
 +      if (defCount == 0 && refs.size() > 0) {
 +        StringBuilder sb = new StringBuilder();
 +        String comma = "";
 +        for (Long ref : refs) {
 +          sb.append(comma);
 +          comma = ",";
 +          sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8));
 +        }
 +
 +        context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
 +        increment(context.getCounter(Counts.UNDEFINED));
 +
 +      } else if (defCount > 0 && refs.size() == 0) {
 +        increment(context.getCounter(Counts.UNREFERENCED));
 +      } else {
 +        increment(context.getCounter(Counts.REFERENCED));
 +      }
 +
 +    }
 +  }
 +
 +  static class Opts extends ClientOnDefaultTable {
 +    @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true)
 +    String outputDir = "/tmp/continuousVerify";
 +
 +    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
 +    int maxMaps = 0;
 +
 +    @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class)
 +    int reducers = 0;
 +
 +    @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
 +    boolean scanOffline = false;
 +
 +    public Opts() {
 +      super("ci");
 +    }
 +  }
 +
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(this.getClass().getName(), args);
 +
 +    Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
 +    job.setJarByClass(this.getClass());
 +
 +    job.setInputFormatClass(AccumuloInputFormat.class);
 +    opts.setAccumuloConfigs(job);
 +
++    Set<Range> ranges = null;
 +    String clone = opts.getTableName();
 +    Connector conn = null;
++
 +    if (opts.scanOffline) {
 +      Random random = new Random();
 +      clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
 +      conn = opts.getConnector();
 +      conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
++      ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +      conn.tableOperations().offline(clone);
 +      AccumuloInputFormat.setInputTableName(job, clone);
 +      AccumuloInputFormat.setOfflineTableScan(job, true);
++    } else {
++      ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +    }
 +
-     // set up ranges
-     try {
-       Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
-       AccumuloInputFormat.setRanges(job, ranges);
-       AccumuloInputFormat.setAutoAdjustRanges(job, false);
-     } catch (Exception e) {
-       throw new IOException(e);
-     }
++    AccumuloInputFormat.setRanges(job, ranges);
++    AccumuloInputFormat.setAutoAdjustRanges(job, false);
 +
 +    job.setMapperClass(CMapper.class);
 +    job.setMapOutputKeyClass(LongWritable.class);
 +    job.setMapOutputValueClass(VLongWritable.class);
 +
 +    job.setReducerClass(CReducer.class);
 +    job.setNumReduceTasks(opts.reducers);
 +
 +    job.setOutputFormatClass(TextOutputFormat.class);
 +
 +    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
 +
 +    TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
 +
 +    job.waitForCompletion(true);
 +
 +    if (opts.scanOffline) {
 +      conn.tableOperations().delete(clone);
 +    }
 +    opts.stopTracing();
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +
 +  /**
 +   * 
 +   * @param args
 +   *          instanceName zookeepers username password table columns outputpath
 +   * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
 +    if (res != 0)
 +      System.exit(res);
 +  }
 +}


[08/10] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT

Posted by md...@apache.org.
Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7f6c2131
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7f6c2131
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7f6c2131

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 7f6c213108538b0a7aaf79e349dbe4551f8afae2
Parents: 2f7b9d3 5d9e155
Author: Mike Drob <md...@cloudera.com>
Authored: Tue Mar 4 14:24:07 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 14:24:07 2014 -0500

----------------------------------------------------------------------
 .../accumulo/test/continuous/ContinuousVerify.java   | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6c2131/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------


[03/10] git commit: ACCUMULO-2233 Get ranges from cloned table

Posted by md...@apache.org.
ACCUMULO-2233 Get ranges from cloned table


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4bdebdb1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4bdebdb1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4bdebdb1

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 4bdebdb1e5aaec75c678d91ff6b7d53ff65e3ec0
Parents: 759582b
Author: Mike Drob <md...@cloudera.com>
Authored: Mon Mar 3 12:52:08 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 09:19:40 2014 -0500

----------------------------------------------------------------------
 .../server/test/continuous/ContinuousVerify.java  | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4bdebdb1/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
index 6165d2a..9441cf5 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
@@ -168,15 +168,21 @@ public class ContinuousVerify extends Configured implements Tool {
     Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
     job.setJarByClass(this.getClass());
 
+    Set<Range> ranges = null;
     String clone = table;
     Connector conn = null;
+
     if (scanOffline) {
       Random random = new Random();
       clone = table + "_" + String.format("%016x", Math.abs(random.nextLong()));
       ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
       conn = zki.getConnector(user, pass.getBytes());
       conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(), new HashSet<String>());
+      ranges = conn.tableOperations().splitRangeByTablets(clone, new Range(), Integer.parseInt(maxMaps));
       conn.tableOperations().offline(clone);
+    } else {
+      ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass).tableOperations()
+          .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
     }
 
     job.setInputFormatClass(AccumuloInputFormat.class);
@@ -189,16 +195,8 @@ public class ContinuousVerify extends Configured implements Tool {
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, authorizations);
     AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
     AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
-
-    // set up ranges
-    try {
-      Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
-          .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
-      AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
-      AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
+    AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
 
     job.setMapperClass(CMapper.class);
     job.setMapOutputKeyClass(LongWritable.class);


[10/10] git commit: Merge branch '1.6.0-SNAPSHOT'

Posted by md...@apache.org.
Merge branch '1.6.0-SNAPSHOT'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fa34e042
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fa34e042
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fa34e042

Branch: refs/heads/master
Commit: fa34e0428629bc741eef74419cea6542519ce7a3
Parents: efd6261 7f6c213
Author: Mike Drob <md...@cloudera.com>
Authored: Tue Mar 4 14:24:20 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 14:24:20 2014 -0500

----------------------------------------------------------------------
 .../accumulo/test/continuous/ContinuousVerify.java   | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[02/10] git commit: ACCUMULO-2233 Get ranges from cloned table

Posted by md...@apache.org.
ACCUMULO-2233 Get ranges from cloned table


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4bdebdb1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4bdebdb1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4bdebdb1

Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 4bdebdb1e5aaec75c678d91ff6b7d53ff65e3ec0
Parents: 759582b
Author: Mike Drob <md...@cloudera.com>
Authored: Mon Mar 3 12:52:08 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 09:19:40 2014 -0500

----------------------------------------------------------------------
 .../server/test/continuous/ContinuousVerify.java  | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4bdebdb1/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
index 6165d2a..9441cf5 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
@@ -168,15 +168,21 @@ public class ContinuousVerify extends Configured implements Tool {
     Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
     job.setJarByClass(this.getClass());
 
+    Set<Range> ranges = null;
     String clone = table;
     Connector conn = null;
+
     if (scanOffline) {
       Random random = new Random();
       clone = table + "_" + String.format("%016x", Math.abs(random.nextLong()));
       ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
       conn = zki.getConnector(user, pass.getBytes());
       conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(), new HashSet<String>());
+      ranges = conn.tableOperations().splitRangeByTablets(clone, new Range(), Integer.parseInt(maxMaps));
       conn.tableOperations().offline(clone);
+    } else {
+      ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass).tableOperations()
+          .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
     }
 
     job.setInputFormatClass(AccumuloInputFormat.class);
@@ -189,16 +195,8 @@ public class ContinuousVerify extends Configured implements Tool {
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, authorizations);
     AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
     AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
-
-    // set up ranges
-    try {
-      Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
-          .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
-      AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
-      AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
+    AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
 
     job.setMapperClass(CMapper.class);
     job.setMapOutputKeyClass(LongWritable.class);


[06/10] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT

Posted by md...@apache.org.
Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5d9e1557
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5d9e1557
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5d9e1557

Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 5d9e1557f822ab22096dd39c4820720256c80187
Parents: bd283ae 4bdebdb
Author: Mike Drob <md...@cloudera.com>
Authored: Tue Mar 4 13:43:57 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 13:43:57 2014 -0500

----------------------------------------------------------------------
 .../accumulo/test/continuous/ContinuousVerify.java   | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d9e1557/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
index a5a6a2b,0000000..07e0c92
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
@@@ -1,244 -1,0 +1,243 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.IOException;
 +import java.lang.reflect.Method;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.ClientOnDefaultTable;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.VLongWritable;
 +import org.apache.hadoop.mapred.Counters.Counter;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +/**
 + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
 + */
 +
 +public class ContinuousVerify extends Configured implements Tool {
 +
 +  // work around hadoop-1/hadoop-2 runtime incompatibility
 +  static private Method INCREMENT;
 +  static {
 +    try {
 +      INCREMENT = Counter.class.getMethod("increment", Long.TYPE);
 +    } catch (Exception ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +
 +  static void increment(Object obj) {
 +    try {
 +      INCREMENT.invoke(obj, 1L);
 +    } catch (Exception ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +
 +  public static final VLongWritable DEF = new VLongWritable(-1);
 +
 +  public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
 +
 +    private LongWritable row = new LongWritable();
 +    private LongWritable ref = new LongWritable();
 +    private VLongWritable vrow = new VLongWritable();
 +
 +    private long corrupt = 0;
 +
 +    @Override
 +    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
 +      long r = Long.parseLong(key.getRow().toString(), 16);
 +      if (r < 0)
 +        throw new IllegalArgumentException();
 +
 +      try {
 +        ContinuousWalk.validate(key, data);
 +      } catch (BadChecksumException bce) {
 +        increment(context.getCounter(Counts.CORRUPT));
 +        if (corrupt < 1000) {
 +          System.out.println("ERROR Bad checksum : " + key);
 +        } else if (corrupt == 1000) {
 +          System.out.println("Too many bad checksums, not printing anymore!");
 +        }
 +        corrupt++;
 +        return;
 +      }
 +
 +      row.set(r);
 +
 +      context.write(row, DEF);
 +      byte[] val = data.get();
 +
 +      int offset = ContinuousWalk.getPrevRowOffset(val);
 +      if (offset > 0) {
 +        ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16));
 +        vrow.set(r);
 +        context.write(ref, vrow);
 +      }
 +    }
 +  }
 +
 +  public static enum Counts {
 +    UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
 +  }
 +
 +  public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
 +    private ArrayList<Long> refs = new ArrayList<Long>();
 +
 +    @Override
 +    public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
 +
 +      int defCount = 0;
 +
 +      refs.clear();
 +      for (VLongWritable type : values) {
 +        if (type.get() == -1) {
 +          defCount++;
 +        } else {
 +          refs.add(type.get());
 +        }
 +      }
 +
 +      if (defCount == 0 && refs.size() > 0) {
 +        StringBuilder sb = new StringBuilder();
 +        String comma = "";
 +        for (Long ref : refs) {
 +          sb.append(comma);
 +          comma = ",";
 +          sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8));
 +        }
 +
 +        context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
 +        increment(context.getCounter(Counts.UNDEFINED));
 +
 +      } else if (defCount > 0 && refs.size() == 0) {
 +        increment(context.getCounter(Counts.UNREFERENCED));
 +      } else {
 +        increment(context.getCounter(Counts.REFERENCED));
 +      }
 +
 +    }
 +  }
 +
 +  static class Opts extends ClientOnDefaultTable {
 +    @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true)
 +    String outputDir = "/tmp/continuousVerify";
 +
 +    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
 +    int maxMaps = 0;
 +
 +    @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class)
 +    int reducers = 0;
 +
 +    @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
 +    boolean scanOffline = false;
 +
 +    public Opts() {
 +      super("ci");
 +    }
 +  }
 +
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(this.getClass().getName(), args);
 +
 +    Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
 +    job.setJarByClass(this.getClass());
 +
 +    job.setInputFormatClass(AccumuloInputFormat.class);
 +    opts.setAccumuloConfigs(job);
 +
++    Set<Range> ranges = null;
 +    String clone = opts.getTableName();
 +    Connector conn = null;
++
 +    if (opts.scanOffline) {
 +      Random random = new Random();
 +      clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
 +      conn = opts.getConnector();
 +      conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
++      ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +      conn.tableOperations().offline(clone);
 +      AccumuloInputFormat.setInputTableName(job, clone);
 +      AccumuloInputFormat.setOfflineTableScan(job, true);
++    } else {
++      ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +    }
 +
-     // set up ranges
-     try {
-       Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
-       AccumuloInputFormat.setRanges(job, ranges);
-       AccumuloInputFormat.setAutoAdjustRanges(job, false);
-     } catch (Exception e) {
-       throw new IOException(e);
-     }
++    AccumuloInputFormat.setRanges(job, ranges);
++    AccumuloInputFormat.setAutoAdjustRanges(job, false);
 +
 +    job.setMapperClass(CMapper.class);
 +    job.setMapOutputKeyClass(LongWritable.class);
 +    job.setMapOutputValueClass(VLongWritable.class);
 +
 +    job.setReducerClass(CReducer.class);
 +    job.setNumReduceTasks(opts.reducers);
 +
 +    job.setOutputFormatClass(TextOutputFormat.class);
 +
 +    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
 +
 +    TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
 +
 +    job.waitForCompletion(true);
 +
 +    if (opts.scanOffline) {
 +      conn.tableOperations().delete(clone);
 +    }
 +    opts.stopTracing();
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +
 +  /**
 +   * 
 +   * @param args
 +   *          instanceName zookeepers username password table columns outputpath
 +   * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
 +    if (res != 0)
 +      System.exit(res);
 +  }
 +}


[04/10] git commit: ACCUMULO-2233 Get ranges from cloned table

Posted by md...@apache.org.
ACCUMULO-2233 Get ranges from cloned table


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4bdebdb1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4bdebdb1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4bdebdb1

Branch: refs/heads/master
Commit: 4bdebdb1e5aaec75c678d91ff6b7d53ff65e3ec0
Parents: 759582b
Author: Mike Drob <md...@cloudera.com>
Authored: Mon Mar 3 12:52:08 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 09:19:40 2014 -0500

----------------------------------------------------------------------
 .../server/test/continuous/ContinuousVerify.java  | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4bdebdb1/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
index 6165d2a..9441cf5 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
@@ -168,15 +168,21 @@ public class ContinuousVerify extends Configured implements Tool {
     Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
     job.setJarByClass(this.getClass());
 
+    Set<Range> ranges = null;
     String clone = table;
     Connector conn = null;
+
     if (scanOffline) {
       Random random = new Random();
       clone = table + "_" + String.format("%016x", Math.abs(random.nextLong()));
       ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
       conn = zki.getConnector(user, pass.getBytes());
       conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(), new HashSet<String>());
+      ranges = conn.tableOperations().splitRangeByTablets(clone, new Range(), Integer.parseInt(maxMaps));
       conn.tableOperations().offline(clone);
+    } else {
+      ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass).tableOperations()
+          .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
     }
 
     job.setInputFormatClass(AccumuloInputFormat.class);
@@ -189,16 +195,8 @@ public class ContinuousVerify extends Configured implements Tool {
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, authorizations);
     AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
     AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
-
-    // set up ranges
-    try {
-      Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
-          .splitRangeByTablets(table, new Range(), Integer.parseInt(maxMaps));
-      AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
-      AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
+    AccumuloInputFormat.disableAutoAdjustRanges(job.getConfiguration());
 
     job.setMapperClass(CMapper.class);
     job.setMapOutputKeyClass(LongWritable.class);


[05/10] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT

Posted by md...@apache.org.
Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5d9e1557
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5d9e1557
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5d9e1557

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 5d9e1557f822ab22096dd39c4820720256c80187
Parents: bd283ae 4bdebdb
Author: Mike Drob <md...@cloudera.com>
Authored: Tue Mar 4 13:43:57 2014 -0500
Committer: Mike Drob <md...@cloudera.com>
Committed: Tue Mar 4 13:43:57 2014 -0500

----------------------------------------------------------------------
 .../accumulo/test/continuous/ContinuousVerify.java   | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d9e1557/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
index a5a6a2b,0000000..07e0c92
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
@@@ -1,244 -1,0 +1,243 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.IOException;
 +import java.lang.reflect.Method;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.ClientOnDefaultTable;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.VLongWritable;
 +import org.apache.hadoop.mapred.Counters.Counter;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +/**
 + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
 + */
 +
 +public class ContinuousVerify extends Configured implements Tool {
 +
 +  // work around hadoop-1/hadoop-2 runtime incompatibility
 +  static private Method INCREMENT;
 +  static {
 +    try {
 +      INCREMENT = Counter.class.getMethod("increment", Long.TYPE);
 +    } catch (Exception ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +
 +  static void increment(Object obj) {
 +    try {
 +      INCREMENT.invoke(obj, 1L);
 +    } catch (Exception ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +
 +  public static final VLongWritable DEF = new VLongWritable(-1);
 +
 +  public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
 +
 +    private LongWritable row = new LongWritable();
 +    private LongWritable ref = new LongWritable();
 +    private VLongWritable vrow = new VLongWritable();
 +
 +    private long corrupt = 0;
 +
 +    @Override
 +    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
 +      long r = Long.parseLong(key.getRow().toString(), 16);
 +      if (r < 0)
 +        throw new IllegalArgumentException();
 +
 +      try {
 +        ContinuousWalk.validate(key, data);
 +      } catch (BadChecksumException bce) {
 +        increment(context.getCounter(Counts.CORRUPT));
 +        if (corrupt < 1000) {
 +          System.out.println("ERROR Bad checksum : " + key);
 +        } else if (corrupt == 1000) {
 +          System.out.println("Too many bad checksums, not printing anymore!");
 +        }
 +        corrupt++;
 +        return;
 +      }
 +
 +      row.set(r);
 +
 +      context.write(row, DEF);
 +      byte[] val = data.get();
 +
 +      int offset = ContinuousWalk.getPrevRowOffset(val);
 +      if (offset > 0) {
 +        ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16));
 +        vrow.set(r);
 +        context.write(ref, vrow);
 +      }
 +    }
 +  }
 +
 +  public static enum Counts {
 +    UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
 +  }
 +
 +  public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
 +    private ArrayList<Long> refs = new ArrayList<Long>();
 +
 +    @Override
 +    public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
 +
 +      int defCount = 0;
 +
 +      refs.clear();
 +      for (VLongWritable type : values) {
 +        if (type.get() == -1) {
 +          defCount++;
 +        } else {
 +          refs.add(type.get());
 +        }
 +      }
 +
 +      if (defCount == 0 && refs.size() > 0) {
 +        StringBuilder sb = new StringBuilder();
 +        String comma = "";
 +        for (Long ref : refs) {
 +          sb.append(comma);
 +          comma = ",";
 +          sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8));
 +        }
 +
 +        context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
 +        increment(context.getCounter(Counts.UNDEFINED));
 +
 +      } else if (defCount > 0 && refs.size() == 0) {
 +        increment(context.getCounter(Counts.UNREFERENCED));
 +      } else {
 +        increment(context.getCounter(Counts.REFERENCED));
 +      }
 +
 +    }
 +  }
 +
 +  static class Opts extends ClientOnDefaultTable {
 +    @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true)
 +    String outputDir = "/tmp/continuousVerify";
 +
 +    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
 +    int maxMaps = 0;
 +
 +    @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class)
 +    int reducers = 0;
 +
 +    @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
 +    boolean scanOffline = false;
 +
 +    public Opts() {
 +      super("ci");
 +    }
 +  }
 +
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(this.getClass().getName(), args);
 +
 +    Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
 +    job.setJarByClass(this.getClass());
 +
 +    job.setInputFormatClass(AccumuloInputFormat.class);
 +    opts.setAccumuloConfigs(job);
 +
++    Set<Range> ranges = null;
 +    String clone = opts.getTableName();
 +    Connector conn = null;
++
 +    if (opts.scanOffline) {
 +      Random random = new Random();
 +      clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
 +      conn = opts.getConnector();
 +      conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
++      ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +      conn.tableOperations().offline(clone);
 +      AccumuloInputFormat.setInputTableName(job, clone);
 +      AccumuloInputFormat.setOfflineTableScan(job, true);
++    } else {
++      ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +    }
 +
-     // set up ranges
-     try {
-       Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
-       AccumuloInputFormat.setRanges(job, ranges);
-       AccumuloInputFormat.setAutoAdjustRanges(job, false);
-     } catch (Exception e) {
-       throw new IOException(e);
-     }
++    AccumuloInputFormat.setRanges(job, ranges);
++    AccumuloInputFormat.setAutoAdjustRanges(job, false);
 +
 +    job.setMapperClass(CMapper.class);
 +    job.setMapOutputKeyClass(LongWritable.class);
 +    job.setMapOutputValueClass(VLongWritable.class);
 +
 +    job.setReducerClass(CReducer.class);
 +    job.setNumReduceTasks(opts.reducers);
 +
 +    job.setOutputFormatClass(TextOutputFormat.class);
 +
 +    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
 +
 +    TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
 +
 +    job.waitForCompletion(true);
 +
 +    if (opts.scanOffline) {
 +      conn.tableOperations().delete(clone);
 +    }
 +    opts.stopTracing();
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +
 +  /**
 +   * 
 +   * @param args
 +   *          instanceName zookeepers username password table columns outputpath
 +   * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
 +    if (res != 0)
 +      System.exit(res);
 +  }
 +}