You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2016/07/08 16:16:41 UTC

hbase git commit: HBASE-15935 Set up a concurrent walker that walks flushed circular linked lists as a Loop Mode

Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 ee63706ee -> 5ad4a824f


HBASE-15935 Set up a concurrent walker that walks flushed circular linked lists as a Loop Mode

Signed-off-by: Elliott Clark <ec...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5ad4a824
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5ad4a824
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5ad4a824

Branch: refs/heads/branch-1.3
Commit: 5ad4a824f1b39df7e6cdd22cac8146c84032496a
Parents: ee63706
Author: Joseph Hwang <jz...@fb.com>
Authored: Thu Jun 2 18:00:08 2016 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Fri Jul 8 09:14:08 2016 -0700

----------------------------------------------------------------------
 .../test/IntegrationTestBigLinkedList.java      | 397 ++++++++++++++-----
 ...egrationTestBigLinkedListWithVisibility.java |  15 +-
 .../hbase/test/IntegrationTestReplication.java  |   3 +-
 3 files changed, 318 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5ad4a824/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 7f0f732..1f57e07 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -169,8 +169,9 @@ import com.google.common.collect.Sets;
  *
  * Below is a description of the Java programs
  *
- * Generator - A map only job that generates data. As stated previously,�
- * its best to generate data in multiples of 25M.
+ * Generator - A map only job that generates data. As stated previously,�its best to generate data
+ * in multiples of 25M. An option is also available to allow concurrent walkers to select and walk
+ * random flushed loops during this phase.
  *
  * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
  * UNREFERENCED are� ok, any UNDEFINED counts are bad. Do not run at the� same
@@ -183,6 +184,11 @@ import com.google.common.collect.Sets;
  * Delete - A standalone program that deletes a single node
  *
  * This class can be run as a unit test, as an integration test, or from the command line
+ *
+ * ex:
+ * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
+ *    loop 2 1 100000 /temp 1 1000 50 1 0
+ *
  */
 @Category(IntegrationTests.class)
 public class IntegrationTestBigLinkedList extends IntegrationTestBase {
@@ -218,6 +224,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
   private static final String GENERATOR_WRAP_KEY
     = "IntegrationTestBigLinkedList.generator.wrap";
 
+  private static final String CONCURRENT_WALKER_KEY
+    = "IntegrationTestBigLinkedList.generator.concurrentwalkers";
+
   protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
 
   private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
@@ -226,6 +235,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
   private static final int WRAP_DEFAULT = 25;
   private static final int ROWKEY_LENGTH = 16;
 
+  private static final int CONCURRENT_WALKER_DEFAULT = 0;
+
   protected String toRun;
   protected String[] otherArgs;
 
@@ -257,6 +268,18 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
     public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
         "generator.multiple.columnfamilies";
 
+    public static enum Counts {
+      SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
+    }
+
+    public static final String USAGE =  "Usage : " + Generator.class.getSimpleName() +
+        " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" +
+        " <num walker threads>] \n" +
+        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
+        "walkers will verify random flushed loop during Generation.";
+
+    public Job job;
+
     static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
       static class GeneratorInputSplit extends InputSplit implements Writable {
         @Override
@@ -372,6 +395,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
      *             |___________________________|
      * </pre>
      */
+
     static class GeneratorMapper
       extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
 
@@ -389,6 +413,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       boolean multipleUnevenColumnFamilies;
       byte[] tinyValue = new byte[] { 't' };
       byte[] bigValue = null;
+      Configuration conf;
+
+      volatile boolean walkersStop;
+      int numWalkers;
+      volatile List<Long> flushedLoops = new ArrayList<>();
+      List<Thread> walkers = new ArrayList<>();
 
       @Override
       protected void setup(Context context) throws IOException, InterruptedException {
@@ -406,6 +436,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
           this.wrap = this.numNodes;
         }
         this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
+        this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
+        this.walkersStop = false;
+        this.conf = context.getConfiguration();
       }
 
       protected void instantiateHTable() throws IOException {
@@ -416,6 +449,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
 
       @Override
       protected void cleanup(Context context) throws IOException ,InterruptedException {
+        joinWalkers();
         mutator.close();
         connection.close();
       }
@@ -444,9 +478,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
             // this block of code turns the 1 million linked list of length 25 into one giant
             //circular linked list of 25 million
             circularLeftShift(first);
-
             persist(output, -1, prev, first, null);
-
+            // At this point the entire loop has been flushed so we can add one of its nodes to the
+            // concurrent walker
+            if (numWalkers > 0) {
+              addFlushed(key.getBytes());
+              if (walkers.isEmpty()) {
+                startWalkers(numWalkers, conf, output);
+              }
+            }
             first = null;
             prev = null;
           }
@@ -459,6 +499,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
         first[first.length - 1] = ez;
       }
 
+      private void addFlushed(byte[] rowKey) {
+        synchronized (flushedLoops) {
+          flushedLoops.add(Bytes.toLong(rowKey));
+          flushedLoops.notifyAll();
+        }
+      }
+
       protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
           throws IOException {
         for (int i = 0; i < current.length; i++) {
@@ -490,27 +537,153 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
             output.progress();
           }
         }
-
         mutator.flush();
       }
+
+      private void startWalkers(int numWalkers, Configuration conf, Context context) {
+        LOG.info("Starting " + numWalkers + " concurrent walkers");
+        for (int i = 0; i < numWalkers; i++) {
+          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
+          walker.start();
+          walkers.add(walker);
+        }
+      }
+
+      private void joinWalkers() {
+        walkersStop = true;
+        synchronized (flushedLoops) {
+          flushedLoops.notifyAll();
+        }
+        for (Thread walker : walkers) {
+          try {
+            walker.join();
+          } catch (InterruptedException e) {
+            // no-op
+          }
+        }
+      }
+
+      /**
+       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
+       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
+       * configured to only log erroneous nodes.
+       */
+
+      public class ContinuousConcurrentWalker implements Runnable {
+
+        ConcurrentWalker walker;
+        Configuration conf;
+        Context context;
+        Random rand;
+
+        public ContinuousConcurrentWalker(Configuration conf, Context context) {
+          this.conf = conf;
+          this.context = context;
+          rand = new Random();
+        }
+
+        @Override
+        public void run() {
+          while (!walkersStop) {
+            try {
+              long node = selectLoop();
+              try {
+                walkLoop(node);
+              } catch (IOException e) {
+                context.getCounter(Counts.IOEXCEPTION).increment(1l);
+                return;
+              }
+            } catch (InterruptedException e) {
+              return;
+            }
+          }
+        }
+
+        private void walkLoop(long node) throws IOException {
+          walker = new ConcurrentWalker(context);
+          walker.setConf(conf);
+          walker.run(node, wrap);
+        }
+
+        private long selectLoop () throws InterruptedException{
+          synchronized (flushedLoops) {
+            while (flushedLoops.isEmpty() && !walkersStop) {
+              flushedLoops.wait();
+            }
+            if (walkersStop) {
+              throw new InterruptedException();
+            }
+            return flushedLoops.get(rand.nextInt(flushedLoops.size()));
+          }
+        }
+      }
+
+      public static class ConcurrentWalker extends WalkerBase {
+
+        Context context;
+
+        public ConcurrentWalker(Context context) {this.context = context;}
+
+        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
+
+          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
+          byte[] startKey = Bytes.toBytes(startKeyIn);
+
+          Connection connection = ConnectionFactory.createConnection(getConf());
+          Table table = connection.getTable(getTableName(getConf()));
+          long numQueries = 0;
+          // If isSpecificStart is set, only walk one list from that particular node.
+          // Note that in case of circular (or P-shaped) list it will walk forever, as is
+          // the case in normal run without startKey.
+
+          CINode node = findStartNode(table, startKey);
+          if (node == null) {
+            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
+            throw new IOException("Start node not found: " + startKeyIn);
+          }
+          while (numQueries < maxQueries) {
+            numQueries++;
+            byte[] prev = node.prev;
+            long t1 = System.currentTimeMillis();
+            node = getNode(prev, table, node);
+            long t2 = System.currentTimeMillis();
+            if (node == null) {
+              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
+              context.getCounter(Counts.UNDEFINED).increment(1l);
+            } else if (node.prev.length == NO_KEY.length) {
+              LOG.error("ConcurrentWalker found TERMINATING NODE: " +
+                  Bytes.toStringBinary(node.key));
+              context.getCounter(Counts.TERMINATING).increment(1l);
+            } else {
+              // Increment for successful walk
+              context.getCounter(Counts.SUCCESS).increment(1l);
+            }
+          }
+          table.close();
+          connection.close();
+        }
+      }
     }
 
     @Override
     public int run(String[] args) throws Exception {
       if (args.length < 3) {
-        System.out.println("Usage : " + Generator.class.getSimpleName() +
-            " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
-        System.out.println("   where <num nodes per map> should be a multiple of " +
-            " width*wrap multiplier, 25M by default");
-        return 0;
+        System.err.println(USAGE);
+        return 1;
+      }
+      try {
+        int numMappers = Integer.parseInt(args[0]);
+        long numNodes = Long.parseLong(args[1]);
+        Path tmpOutput = new Path(args[2]);
+        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
+        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
+        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
+        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
+      } catch (NumberFormatException e) {
+        System.err.println("Parsing generator arguments failed: " + e.getMessage());
+        System.err.println(USAGE);
+        return 1;
       }
-
-      int numMappers = Integer.parseInt(args[0]);
-      long numNodes = Long.parseLong(args[1]);
-      Path tmpOutput = new Path(args[2]);
-      Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
-      Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
-      return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
     }
 
     protected void createSchema() throws IOException {
@@ -557,7 +730,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
     }
 
     public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
-        Integer width, Integer wrapMuplitplier) throws Exception {
+        Integer width, Integer wrapMultiplier, Integer numWalkers)
+        throws Exception {
       LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
           + ", numNodes=" + numNodes);
       Job job = Job.getInstance(getConf());
@@ -570,7 +744,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       job.setOutputKeyClass(BytesWritable.class);
       job.setOutputValueClass(NullWritable.class);
 
-      setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
+      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
 
       job.setMapperClass(Mapper.class); //identity mapper
 
@@ -583,10 +757,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
     }
 
     public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
-        Integer width, Integer wrapMuplitplier) throws Exception {
+        Integer width, Integer wrapMultiplier, Integer numWalkers)
+        throws Exception {
       LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
       createSchema();
-      Job job = Job.getInstance(getConf());
+      job = Job.getInstance(getConf());
 
       job.setJobName("Link Generator");
       job.setNumReduceTasks(0);
@@ -597,7 +772,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       job.setOutputKeyClass(NullWritable.class);
       job.setOutputValueClass(NullWritable.class);
 
-      setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
+      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
 
       setMapperForGenerator(job);
 
@@ -624,12 +799,34 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
     }
 
     public int run(int numMappers, long numNodes, Path tmpOutput,
-        Integer width, Integer wrapMuplitplier) throws Exception {
-      int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
+        Integer width, Integer wrapMultiplier, Integer numWalkers)
+        throws Exception {
+      int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier,
+          numWalkers);
       if (ret > 0) {
         return ret;
       }
-      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
+      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
+    }
+
+    public boolean verify() {
+      try {
+        Counters counters = job.getCounters();
+
+        if (counters.findCounter(Counts.TERMINATING).getValue() > 0 ||
+            counters.findCounter(Counts.UNDEFINED).getValue() > 0 ||
+            counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) {
+          LOG.error("Concurrent walker failed to verify during Generation phase");
+          LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue());
+          LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue());
+          LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue());
+          return false;
+        }
+      } catch (IOException e) {
+        LOG.info("Generator verification could not find counter");
+        return false;
+      }
+      return true;
     }
   }
 
@@ -1231,21 +1428,33 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
   static class Loop extends Configured implements Tool {
 
     private static final Log LOG = LogFactory.getLog(Loop.class);
+    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " +
+        "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" +
+        " <num walker threads>] \n" +
+        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
+        "walkers will select and verify random flushed loop during Generation.";
 
     IntegrationTestBigLinkedList it;
 
     protected void runGenerator(int numMappers, long numNodes,
-        String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
+        String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers)
+        throws Exception {
       Path outputPath = new Path(outputDir);
       UUID uuid = UUID.randomUUID(); //create a random UUID.
       Path generatorOutput = new Path(outputPath, uuid.toString());
 
       Generator generator = new Generator();
       generator.setConf(getConf());
-      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
+      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
+          numWalkers);
       if (retCode > 0) {
         throw new RuntimeException("Generator failed with return code: " + retCode);
       }
+      if (numWalkers > 0) {
+        if (!generator.verify()) {
+          throw new RuntimeException("Generator.verify failed");
+        }
+      }
     }
 
     protected void runVerify(String outputDir,
@@ -1264,41 +1473,43 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       if (!verify.verify(expectedNumNodes)) {
         throw new RuntimeException("Verify.verify failed");
       }
-
-      LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
+      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
     }
 
     @Override
     public int run(String[] args) throws Exception {
       if (args.length < 5) {
-        System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
+        System.err.println(USAGE);
         return 1;
       }
-      LOG.info("Running Loop with args:" + Arrays.deepToString(args));
-
-      int numIterations = Integer.parseInt(args[0]);
-      int numMappers = Integer.parseInt(args[1]);
-      long numNodes = Long.parseLong(args[2]);
-      String outputDir = args[3];
-      int numReducers = Integer.parseInt(args[4]);
-      Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
-      Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
-
-      long expectedNumNodes = 0;
-
-      if (numIterations < 0) {
-        numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
-      }
-
-      for (int i = 0; i < numIterations; i++) {
-        LOG.info("Starting iteration = " + i);
-        runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
-        expectedNumNodes += numMappers * numNodes;
-
-        runVerify(outputDir, numReducers, expectedNumNodes);
+      try {
+        int numIterations = Integer.parseInt(args[0]);
+        int numMappers = Integer.parseInt(args[1]);
+        long numNodes = Long.parseLong(args[2]);
+        String outputDir = args[3];
+        int numReducers = Integer.parseInt(args[4]);
+        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
+        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
+        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
+
+        long expectedNumNodes = 0;
+
+        if (numIterations < 0) {
+          numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
+        }
+        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
+        for (int i = 0; i < numIterations; i++) {
+          LOG.info("Starting iteration = " + i);
+          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
+          expectedNumNodes += numMappers * numNodes;
+          runVerify(outputDir, numReducers, expectedNumNodes);
+        }
+        return 0;
+      } catch (NumberFormatException e) {
+        System.err.println("Parsing loop arguments failed: " + e.getMessage());
+        System.err.println(USAGE);
+        return 1;
       }
-
-      return 0;
     }
   }
 
@@ -1387,13 +1598,47 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
     }
   }
 
+  abstract static class WalkerBase extends Configured{
+    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
+      Scan scan = new Scan();
+      scan.setStartRow(startKey);
+      scan.setBatch(1);
+      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
+
+      long t1 = System.currentTimeMillis();
+      ResultScanner scanner = table.getScanner(scan);
+      Result result = scanner.next();
+      long t2 = System.currentTimeMillis();
+      scanner.close();
+
+      if ( result != null) {
+        CINode node = getCINode(result, new CINode());
+        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
+        return node;
+      }
+
+      System.out.println("FSR " + (t2 - t1));
+
+      return null;
+    }
+    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
+      Get get = new Get(row);
+      get.addColumn(FAMILY_NAME, COLUMN_PREV);
+      Result result = table.get(get);
+      return getCINode(result, node);
+    }
+  }
   /**
    * A stand alone program that follows a linked list created by {@link Generator} and prints
    * timing info.
    */
-  private static class Walker extends Configured implements Tool {
+  private static class Walker extends WalkerBase implements Tool {
+
+    public Walker(){}
+
     @Override
     public int run(String[] args) throws IOException {
+
       Options options = new Options();
       options.addOption("n", "num", true, "number of queries");
       options.addOption("s", "start", true, "key to start at, binary string");
@@ -1420,6 +1665,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       }
       Random rand = new SecureRandom();
       boolean isSpecificStart = cmd.hasOption('s');
+
       byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
       int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
 
@@ -1438,12 +1684,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
           System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
         }
         numQueries++;
-        while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
+        while (node != null && node.prev.length != NO_KEY.length &&
+            numQueries < maxQueries) {
           byte[] prev = node.prev;
           long t1 = System.currentTimeMillis();
           node = getNode(prev, table, node);
           long t2 = System.currentTimeMillis();
-          if (numQueries % logEvery == 0) {
+          if (logEvery > 0 && numQueries % logEvery == 0) {
             System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
           }
           numQueries++;
@@ -1454,40 +1701,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
           }
         }
       }
-
       table.close();
       return 0;
     }
-
-    private static CINode findStartNode(Table table, byte[] startKey) throws IOException {
-      Scan scan = new Scan();
-      scan.setStartRow(startKey);
-      scan.setBatch(1);
-      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
-
-      long t1 = System.currentTimeMillis();
-      ResultScanner scanner = table.getScanner(scan);
-      Result result = scanner.next();
-      long t2 = System.currentTimeMillis();
-      scanner.close();
-
-      if ( result != null) {
-        CINode node = getCINode(result, new CINode());
-        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
-        return node;
-      }
-
-      System.out.println("FSR " + (t2 - t1));
-
-      return null;
-    }
-
-    private CINode getNode(byte[] row, Table table, CINode node) throws IOException {
-      Get get = new Get(row);
-      get.addColumn(FAMILY_NAME, COLUMN_PREV);
-      Result result = table.get(get);
-      return getCINode(result, node);
-    }
   }
 
   private static class Clean extends Configured implements Tool {
@@ -1664,7 +1880,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
   }
 
   private static void setJobConf(Job job, int numMappers, long numNodes,
-      Integer width, Integer wrapMultiplier) {
+      Integer width, Integer wrapMultiplier, Integer numWalkers) {
     job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
     job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
     if (width != null) {
@@ -1673,6 +1889,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
     if (wrapMultiplier != null) {
       job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
     }
+    if (numWalkers != null) {
+      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
+    }
   }
 
   public static void setJobScannerConf(Job job) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ad4a824/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
index 4dd62d1..8e93eaa 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
@@ -478,21 +478,22 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
 
     @Override
     protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
-        Integer wrapMuplitplier) throws Exception {
+        Integer wrapMultiplier, Integer numWalkers) throws Exception {
       Path outputPath = new Path(outputDir);
       UUID uuid = UUID.randomUUID(); // create a random UUID.
       Path generatorOutput = new Path(outputPath, uuid.toString());
 
       Generator generator = new VisibilityGenerator();
       generator.setConf(getConf());
-      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
+      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
+          numWalkers);
       if (retCode > 0) {
         throw new RuntimeException("Generator failed with return code: " + retCode);
       }
     }
 
     protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
-        Integer wrapMuplitplier, int tableIndex) throws Exception {
+        Integer wrapMultiplier, int tableIndex) throws Exception {
       LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
       Copier copier = new Copier(
           IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
@@ -593,8 +594,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
       String outputDir = args[3];
       int numReducers = Integer.parseInt(args[4]);
       Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
-      Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
-
+      Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
       long expectedNumNodes = 0;
 
       if (numIterations < 0) {
@@ -604,7 +604,8 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
       for (int i = 0; i < numIterations; i++) {
         LOG.info("Starting iteration = " + i);
         LOG.info("Generating data");
-        runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
+        // By default run no concurrent walkers for test with visibility
+        runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
         expectedNumNodes += numMappers * numNodes;
         // Copying wont work because expressions are not returned back to the
         // client
@@ -617,7 +618,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
         sleep(SLEEP_IN_MS);
         for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
           LOG.info("Deleting data on table with index: "+j);
-          runDelete(numMappers, numNodes, outputDir, width, wrapMuplitplier, j);
+          runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
           sleep(SLEEP_IN_MS);
           LOG.info("Verifying common table after deleting");
           runVerify(outputDir, numReducers, expectedNumNodes, j);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ad4a824/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java
index c6668ad..141b24d 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java
@@ -267,7 +267,8 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
       Generator generator = new Generator();
       generator.setConf(source.getConfiguration());
 
-      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier);
+      // Disable concurrent walkers for IntegrationTestReplication
+      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 0);
       if (retCode > 0) {
         throw new RuntimeException("Generator failed with return code: " + retCode);
       }