You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/02/23 06:29:25 UTC

hbase git commit: HBASE-11842 Integration test for async wal replication to secondary regions

Repository: hbase
Updated Branches:
  refs/heads/master 7792dee0c -> 21b366afe


HBASE-11842 Integration test for async wal replication to secondary regions


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/21b366af
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/21b366af
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/21b366af

Branch: refs/heads/master
Commit: 21b366afe1fa73cfee5601db8c661049788e97d7
Parents: 7792dee
Author: Enis Soztutar <en...@apache.org>
Authored: Sun Feb 22 21:29:12 2015 -0800
Committer: Enis Soztutar <en...@apache.org>
Committed: Sun Feb 22 21:29:12 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/IntegrationTestIngest.java     |  35 ++-
 .../IntegrationTestIngestStripeCompactions.java |  11 +-
 ...IntegrationTestRegionReplicaReplication.java | 231 +++++++++++++++++++
 .../hbase/regionserver/MemStoreFlusher.java     |  18 +-
 .../replication/regionserver/MetricsSource.java |   2 +-
 .../RegionReplicaReplicationEndpoint.java       |  88 ++++---
 .../regionserver/ReplicationSource.java         |   5 +-
 .../hadoop/hbase/util/CompressionTest.java      |   5 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |  40 +++-
 ...egionReplicaReplicationEndpointNoMaster.java |   1 +
 .../hadoop/hbase/util/ConstantDelayQueue.java   | 196 ++++++++++++++++
 .../apache/hadoop/hbase/util/LoadTestTool.java  |  83 ++++++-
 .../hbase/util/MultiThreadedWriterBase.java     |   7 +-
 .../hadoop/hbase/util/RestartMetaTest.java      |   6 +-
 14 files changed, 674 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
index c0c54b7..8495889 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LoadTestTool;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.Assert;
 import org.junit.Test;
@@ -45,8 +46,8 @@ import com.google.common.collect.Sets;
 public class IntegrationTestIngest extends IntegrationTestBase {
   public static final char HIPHEN = '-';
   private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
-  private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
-  private static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
+  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
+  protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
 
   /** A soft limit on how long we should run */
   protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
@@ -66,6 +67,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
   protected LoadTestTool loadTool;
 
   protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
+      LoadTestTool.OPT_COLUMN_FAMILIES,
       LoadTestTool.OPT_COMPRESSION,
       LoadTestTool.OPT_DATA_BLOCK_ENCODING,
       LoadTestTool.OPT_INMEMORY,
@@ -78,7 +80,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
   public void setUpCluster() throws Exception {
     util = getTestingUtil(getConf());
     LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
-    util.initializeCluster(SERVER_COUNT);
+    util.initializeCluster(getMinServerCount());
     LOG.debug("Done initializing/checking cluster");
     cluster = util.getHBaseClusterInterface();
     deleteTableIfNecessary();
@@ -89,6 +91,10 @@ public class IntegrationTestIngest extends IntegrationTestBase {
     initTable();
   }
 
+  protected int getMinServerCount() {
+    return SERVER_COUNT;
+  }
+
   protected void initTable() throws IOException {
     int ret = loadTool.run(getArgsForLoadTestToolInitTable());
     Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
@@ -125,7 +131,22 @@ public class IntegrationTestIngest extends IntegrationTestBase {
 
   @Override
   protected Set<String> getColumnFamilies() {
-    return Sets.newHashSet(Bytes.toString(LoadTestTool.COLUMN_FAMILY));
+    Set<String> families = Sets.newHashSet();
+    String clazz = this.getClass().getSimpleName();
+    // parse conf for getting the column famly names because LTT is not initialized yet.
+    String familiesString = getConf().get(
+      String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
+    if (familiesString == null) {
+      for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) {
+        families.add(Bytes.toString(family));
+      }
+    } else {
+       for (String family : familiesString.split(",")) {
+         families.add(family);
+       }
+    }
+
+    return families;
   }
 
   private void deleteTableIfNecessary() throws IOException {
@@ -206,6 +227,8 @@ public class IntegrationTestIngest extends IntegrationTestBase {
     List<String> args = new ArrayList<String>();
     args.add("-tn");
     args.add(getTablename().getNameAsString());
+    args.add("-families");
+    args.add(getColumnFamiliesAsString());
     args.add(mode);
     args.add(modeSpecificArg);
     args.add("-start_key");
@@ -217,6 +240,10 @@ public class IntegrationTestIngest extends IntegrationTestBase {
     return args.toArray(new String[args.size()]);
   }
 
+  private String getColumnFamiliesAsString() {
+    return StringUtils.join(",", getColumnFamilies());
+  }
+
   /** Estimates a data size based on the cluster size */
   protected long getNumKeys(long keysPerServer)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java
index ebf159e..d64fbb0 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hbase;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreEngine;
 import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.experimental.categories.Category;
 
 /**
@@ -39,7 +41,14 @@ public class IntegrationTestIngestStripeCompactions extends IntegrationTestInges
     HTableDescriptor htd = new HTableDescriptor(getTablename());
     htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
     htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100");
-    HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.COLUMN_FAMILY);
+    HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.DEFAULT_COLUMN_FAMILY);
     HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd);
   }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    IntegrationTestingUtility.setUseDistributedCluster(conf);
+    int ret = ToolRunner.run(conf, new IntegrationTestIngestStripeCompactions(), args);
+    System.exit(ret);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
new file mode 100644
index 0000000..30da5c0
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.util.ConstantDelayQueue;
+import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.hbase.util.MultiThreadedUpdater;
+import org.apache.hadoop.hbase.util.MultiThreadedWriter;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration test for testing async wal replication to secondary region replicas. Sets up a table
+ * with given region replication (default 2), and uses LoadTestTool client writer, updater and
+ * reader threads for writes and reads and verification. It uses a delay queue with a given delay
+ * ("read_delay_ms", default 5000ms) between the writer/updater and reader threads to make the
+ * written items available to readers. This means that a reader will only start reading from a row
+ * written by the writer / updater after 5secs has passed. The reader thread performs the reads from
+ * the given region replica id (default 1) to perform the reads. Async wal replication has to finish
+ * with the replication of the edits before read_delay_ms to the given region replica id so that
+ * the read and verify will not fail.
+ *
+ * The job will run for <b>at least<b> given runtime (default 10min) by running a concurrent
+ * writer and reader workload followed by a concurrent updater and reader workload for
+ * num_keys_per_server.
+ *<p>
+ * Example usage:
+ * <pre>
+ * hbase org.apache.hadoop.hbase.IntegrationTestRegionReplicaReplication
+ * -DIntegrationTestRegionReplicaReplication.num_keys_per_server=10000
+ * -Dhbase.IntegrationTestRegionReplicaReplication.runtime=600000
+ * -DIntegrationTestRegionReplicaReplication.read_delay_ms=5000
+ * -DIntegrationTestRegionReplicaReplication.region_replication=3
+ * -DIntegrationTestRegionReplicaReplication.region_replica_id=2
+ * -DIntegrationTestRegionReplicaReplication.num_read_threads=100
+ * -DIntegrationTestRegionReplicaReplication.num_write_threads=100
+ * </pre>
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest {
+
+  private static final String TEST_NAME
+    = IntegrationTestRegionReplicaReplication.class.getSimpleName();
+
+  private static final String OPT_READ_DELAY_MS = "read_delay_ms";
+
+  private static final int DEFAULT_REGION_REPLICATION = 2;
+  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
+  private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] {"f1", "f2", "f3"};
+
+  @Override
+  protected int getMinServerCount() {
+    return SERVER_COUNT;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    conf.setIfUnset(
+      String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION),
+      String.valueOf(DEFAULT_REGION_REPLICATION));
+
+    conf.setIfUnset(
+      String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES),
+      StringUtils.join(",", DEFAULT_COLUMN_FAMILIES));
+
+    conf.setBoolean("hbase.table.sanity.checks", true);
+
+    // enable async wal replication to region replicas for unit tests
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB
+    conf.setInt("hbase.hstore.blockingStoreFiles", 100);
+
+    super.setConf(conf);
+  }
+
+  @Override
+  @Test
+  public void testIngest() throws Exception {
+    runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
+  }
+
+  @Override
+  protected void startMonkey() throws Exception {
+    // TODO: disabled for now
+  }
+
+  /**
+   * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
+   * threads to become available to the MultiThradedReader threads. We add this delay because of
+   * the async nature of the wal replication to region replicas.
+   */
+  public static class DelayingMultiThreadedWriter extends MultiThreadedWriter {
+    private long delayMs;
+    public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
+        TableName tableName) throws IOException {
+      super(dataGen, conf, tableName);
+    }
+    @Override
+    protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
+      this.delayMs = conf.getLong(String.format("%s.%s",
+        IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
+      return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
+    }
+  }
+
+  /**
+   * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
+   * threads to become available to the MultiThradedReader threads. We add this delay because of
+   * the async nature of the wal replication to region replicas.
+   */
+  public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater {
+    private long delayMs;
+    public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
+        TableName tableName, double updatePercent) throws IOException {
+      super(dataGen, conf, tableName, updatePercent);
+    }
+    @Override
+    protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
+      this.delayMs = conf.getLong(String.format("%s.%s",
+        IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
+      return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
+    }
+  }
+
+  @Override
+  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
+      int recordSize, int writeThreads, int readThreads) throws Exception {
+
+    LOG.info("Running ingest");
+    LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
+
+    // sleep for some time so that the cache for disabled tables does not interfere.
+    Threads.sleep(
+      getConf().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
+        5000) + 1000);
+
+    long start = System.currentTimeMillis();
+    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
+    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
+    long startKey = 0;
+
+    long numKeys = getNumKeys(keysPerServerPerIter);
+    while (System.currentTimeMillis() - start < 0.9 * runtime) {
+      LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
+          ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
+
+      int verifyPercent = 100;
+      int updatePercent = 20;
+      int ret = -1;
+      int regionReplicaId = conf.getInt(String.format("%s.%s"
+        , TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1);
+
+      // we will run writers and readers at the same time.
+      List<String> args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
+      args.add("-write");
+      args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads));
+      args.add("-" + LoadTestTool.OPT_MULTIPUT);
+      args.add("-writer");
+      args.add(DelayingMultiThreadedWriter.class.getName()); // inject writer class
+      args.add("-read");
+      args.add(String.format("%d:%d", verifyPercent, readThreads));
+      args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
+      args.add(String.valueOf(regionReplicaId));
+
+      ret = loadTool.run(args.toArray(new String[args.size()]));
+      if (0 != ret) {
+        String errorMsg = "Load failed with error code " + ret;
+        LOG.error(errorMsg);
+        Assert.fail(errorMsg);
+      }
+
+      args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
+      args.add("-update");
+      args.add(String.format("%s:%s:1", updatePercent, writeThreads));
+      args.add("-updater");
+      args.add(DelayingMultiThreadedUpdater.class.getName()); // inject updater class
+      args.add("-read");
+      args.add(String.format("%d:%d", verifyPercent, readThreads));
+      args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
+      args.add(String.valueOf(regionReplicaId));
+
+      ret = loadTool.run(args.toArray(new String[args.size()]));
+      if (0 != ret) {
+        String errorMsg = "Load failed with error code " + ret;
+        LOG.error(errorMsg);
+        Assert.fail(errorMsg);
+      }
+      startKey += numKeys;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    IntegrationTestingUtility.setUseDistributedCluster(conf);
+    int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args);
+    System.exit(ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index e5ad590..6268b78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.apache.hadoop.util.StringUtils.humanReadableInt;
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
@@ -105,9 +106,9 @@ class MemStoreFlusher implements FlushRequester {
     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
     float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
     this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
-    this.globalMemStoreLimitLowMarkPercent = 
+    this.globalMemStoreLimitLowMarkPercent =
         HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
-    this.globalMemStoreLimitLowMark = 
+    this.globalMemStoreLimitLowMark =
         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
 
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
@@ -178,7 +179,11 @@ class MemStoreFlusher implements FlushRequester {
 
       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
 
-      LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
+      LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
+          + "Total Memstore size="
+          + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
+          + ", Region memstore size="
+          + humanReadableInt(regionToFlush.memstoreSize.get()));
       flushedOne = flushRegion(regionToFlush, true, true);
       if (!flushedOne) {
         LOG.info("Excluding unflushable region " + regionToFlush +
@@ -292,6 +297,7 @@ class MemStoreFlusher implements FlushRequester {
       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
   }
 
+  @Override
   public void requestFlush(HRegion r, boolean forceFlushAllStores) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
@@ -304,6 +310,7 @@ class MemStoreFlusher implements FlushRequester {
     }
   }
 
+  @Override
   public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
@@ -591,6 +598,7 @@ class MemStoreFlusher implements FlushRequester {
    * Register a MemstoreFlushListener
    * @param listener
    */
+  @Override
   public void registerFlushRequestListener(final FlushRequestListener listener) {
     this.flushRequestListeners.add(listener);
   }
@@ -600,6 +608,7 @@ class MemStoreFlusher implements FlushRequester {
    * @param listener
    * @return true when passed listener is unregistered successfully.
    */
+  @Override
   public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
     return this.flushRequestListeners.remove(listener);
   }
@@ -608,9 +617,10 @@ class MemStoreFlusher implements FlushRequester {
    * Sets the global memstore limit to a new size.
    * @param globalMemStoreSize
    */
+  @Override
   public void setGlobalMemstoreLimit(long globalMemStoreSize) {
     this.globalMemStoreLimit = globalMemStoreSize;
-    this.globalMemStoreLimitLowMark = 
+    this.globalMemStoreLimitLowMark =
         (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
     reclaimMemStoreMemory();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 21296a0..04c3d2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -107,7 +107,7 @@ public class MetricsSource {
    *
    * @param delta the number filtered.
    */
-  private void incrLogEditsFiltered(long delta) {
+  public void incrLogEditsFiltered(long delta) {
     singleSourceSource.incrLogEditsFiltered(delta);
     globalSourceSource.incrLogEditsFiltered(delta);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index fc19603..b38a0e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -79,8 +79,8 @@ import com.google.common.cache.CacheBuilder;
 import com.google.protobuf.ServiceException;
 
 /**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 
- * which receives the WAL edits from the WAL, and sends the edits to replicas 
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
+ * which receives the WAL edits from the WAL, and sends the edits to replicas
  * of regions.
  */
 @InterfaceAudience.Private
@@ -232,6 +232,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
           entryBuffers.appendEntry(entry);
         }
         outputSink.flush(); // make sure everything is flushed
+        ctx.getMetrics().incrLogEditsFiltered(
+          outputSink.getSkippedEditsCounter().getAndSet(0));
         return true;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -341,24 +343,58 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
         List<Entry> entries) throws IOException {
 
       if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
-        sink.getSkippedEditsCounter().incrementAndGet();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
+            + " is cached as a disabled or dropped table");
+        }
+        sink.getSkippedEditsCounter().addAndGet(entries.size());
         return;
       }
 
-      // get the replicas of the primary region
+      // If the table is disabled or dropped, we should not replay the entries, and we can skip
+      // replaying them. However, we might not know whether the table is disabled until we
+      // invalidate the cache and check from meta
       RegionLocations locations = null;
-      try {
-        locations = getRegionLocations(connection, tableName, row, true, 0);
+      boolean useCache = true;
+      while (true) {
+        // get the replicas of the primary region
+        try {
+          locations = getRegionLocations(connection, tableName, row, useCache, 0);
 
-        if (locations == null) {
-          throw new HBaseIOException("Cannot locate locations for "
-              + tableName + ", row:" + Bytes.toStringBinary(row));
+          if (locations == null) {
+            throw new HBaseIOException("Cannot locate locations for "
+                + tableName + ", row:" + Bytes.toStringBinary(row));
+          }
+        } catch (TableNotFoundException e) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
+              + " is dropped. Adding table to cache.");
+          }
+          disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
+          // skip this entry
+          sink.getSkippedEditsCounter().addAndGet(entries.size());
+          return;
         }
-      } catch (TableNotFoundException e) {
-        disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
-        // skip this entry
-        sink.getSkippedEditsCounter().addAndGet(entries.size());
-        return;
+
+        // check whether we should still replay this entry. If the regions are changed, or the
+        // entry is not coming from the primary region, filter it out.
+        HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
+        if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
+          encodedRegionName)) {
+          if (useCache) {
+            useCache = false;
+            continue; // this will retry location lookup
+          }
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+              + " because located region region " + primaryLocation.getRegionInfo().getEncodedName()
+              + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
+              + " from WALEdit");
+          }
+          sink.getSkippedEditsCounter().addAndGet(entries.size());
+          return;
+        }
+        break;
       }
 
       if (locations.size() == 1) {
@@ -366,17 +402,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       }
 
       ArrayList<Future<ReplicateWALEntryResponse>> tasks
-        = new ArrayList<Future<ReplicateWALEntryResponse>>(2);
-
-      // check whether we should still replay this entry. If the regions are changed, or the
-      // entry is not coming form the primary region, filter it out.
-      HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
-      if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
-        encodedRegionName)) {
-        sink.getSkippedEditsCounter().addAndGet(entries.size());
-        return;
-      }
-
+        = new ArrayList<Future<ReplicateWALEntryResponse>>(locations.size() - 1);
 
       // All passed entries should belong to one region because it is coming from the EntryBuffers
       // split per region. But the regions might split and merge (unlike log recovery case).
@@ -413,6 +439,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
             // check whether the table is dropped or disabled which might cause
             // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
             if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+                  + " because received exception for dropped or disabled table", cause);
+              }
               disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
               if (!tasksCancelled) {
                 sink.getSkippedEditsCounter().addAndGet(entries.size());
@@ -490,6 +520,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
         initialEncodedRegionName)) {
         skip = true;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+            + " because located region region " + location.getRegionInfo().getEncodedName()
+            + " is different than the original region "
+            + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
+        }
         return null;
       }
 
@@ -504,7 +540,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
     private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
         throws IOException {
       if (entries.isEmpty() || skip) {
-        skippedEntries.incrementAndGet();
+        skippedEntries.addAndGet(entries.size());
         return ReplicateWALEntryResponse.newBuilder().build();
       }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 714080f..794a3e1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -691,8 +691,10 @@ public class ReplicationSource extends Thread
         }
         replicateContext.setEntries(entries).setSize(currentSize);
 
+        long startTimeNs = System.nanoTime();
         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
         boolean replicated = replicationEndpoint.replicate(replicateContext);
+        long endTimeNs = System.nanoTime();
 
         if (!replicated) {
           continue;
@@ -713,7 +715,8 @@ public class ReplicationSource extends Thread
         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
         if (LOG.isTraceEnabled()) {
           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
-              + this.totalReplicatedOperations + " operations");
+              + this.totalReplicatedOperations + " operations in " +
+              ((endTimeNs - startTimeNs)/1000000) + " ms");
         }
         break;
       } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
index 5ec13f4..cdef12f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -87,7 +88,7 @@ public class CompressionTest {
         return ; // already passed test, dont do it again.
       } else {
         // failed.
-        throw new IOException("Compression algorithm '" + algo.getName() + "'" +
+        throw new DoNotRetryIOException("Compression algorithm '" + algo.getName() + "'" +
         " previously failed test.");
       }
     }
@@ -98,7 +99,7 @@ public class CompressionTest {
       compressionTestResults[algo.ordinal()] = true; // passes
     } catch (Throwable t) {
       compressionTestResults[algo.ordinal()] = false; // failure
-      throw new IOException(t);
+      throw new DoNotRetryIOException(t);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 1a377fc..c1897cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -3577,6 +3577,29 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    * @return the number of regions the table was split into
    */
   public static int createPreSplitLoadTestTable(Configuration conf,
+      TableName tableName, byte[][] columnFamilies, Algorithm compression,
+      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
+      Durability durability)
+          throws IOException {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.setDurability(durability);
+    desc.setRegionReplication(regionReplication);
+    HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
+    for (int i = 0; i < columnFamilies.length; i++) {
+      HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
+      hcd.setDataBlockEncoding(dataBlockEncoding);
+      hcd.setCompressionType(compression);
+      hcds[i] = hcd;
+    }
+    return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
+  }
+
+  /**
+   * Creates a pre-split table for load testing. If the table already exists,
+   * logs a warning and continues.
+   * @return the number of regions the table was split into
+   */
+  public static int createPreSplitLoadTestTable(Configuration conf,
       HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
     return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
   }
@@ -3588,8 +3611,21 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    */
   public static int createPreSplitLoadTestTable(Configuration conf,
       HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException {
-    if (!desc.hasFamily(hcd.getName())) {
-      desc.addFamily(hcd);
+    return createPreSplitLoadTestTable(conf, desc, new HColumnDescriptor[] {hcd},
+        numRegionsPerServer);
+  }
+
+  /**
+   * Creates a pre-split table for load testing. If the table already exists,
+   * logs a warning and continues.
+   * @return the number of regions the table was split into
+   */
+  public static int createPreSplitLoadTestTable(Configuration conf,
+      HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException {
+    for (HColumnDescriptor hcd : hcds) {
+      if (!desc.hasFamily(hcd.getName())) {
+        desc.addFamily(hcd);
+      }
     }
 
     int totalNumberOfRegions = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index a191bdd..2326301 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -242,6 +242,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
 
     ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
     when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
+    when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
 
     replicator.init(context);
     replicator.start();

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java
new file mode 100644
index 0000000..73ce71a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A blocking queue implementation for adding a constant delay. Uses a DelayQueue as a backing store
+ * @param <E> type of elements
+ */
+@InterfaceAudience.Private
+public class ConstantDelayQueue<E> implements BlockingQueue<E> {
+
+  private static final class DelayedElement<T> implements Delayed {
+    T element;
+    long end;
+    public DelayedElement(T element, long delayMs) {
+      this.element = element;
+      this.end = EnvironmentEdgeManager.currentTime() + delayMs;
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
+      return cmp == 0 ? 0 : ( cmp < 0 ? -1 : 1);
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    }
+  }
+
+  private final long delayMs;
+
+  // backing DelayQueue
+  private DelayQueue<DelayedElement<E>> queue = new DelayQueue<DelayedElement<E>>();
+
+  public ConstantDelayQueue(TimeUnit timeUnit, long delay) {
+    this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit);
+  }
+
+  @Override
+  public E remove() {
+    DelayedElement<E> el = queue.remove();
+    return el == null ? null : el.element;
+  }
+
+  @Override
+  public E poll() {
+    DelayedElement<E> el = queue.poll();
+    return el == null ? null : el.element;
+  }
+
+  @Override
+  public E element() {
+    DelayedElement<E> el = queue.element();
+    return el == null ? null : el.element;
+  }
+
+  @Override
+  public E peek() {
+    DelayedElement<E> el = queue.peek();
+    return el == null ? null : el.element;
+  }
+
+  @Override
+  public int size() {
+    return queue.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return queue.isEmpty();
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public Object[] toArray() {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public <T> T[] toArray(T[] a) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends E> c) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public void clear() {
+    queue.clear();
+  }
+
+  @Override
+  public boolean add(E e) {
+    return queue.add(new DelayedElement<E>(e, delayMs));
+  }
+
+  @Override
+  public boolean offer(E e) {
+    return queue.offer(new DelayedElement<E>(e, delayMs));
+  }
+
+  @Override
+  public void put(E e) throws InterruptedException {
+    queue.put(new DelayedElement<E>(e, delayMs));
+  }
+
+  @Override
+  public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+    return queue.offer(new DelayedElement<E>(e, delayMs), timeout, unit);
+  }
+
+  @Override
+  public E take() throws InterruptedException {
+    DelayedElement<E> el = queue.take();
+    return el == null ? null : el.element;
+  }
+
+  @Override
+  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+    DelayedElement<E> el = queue.poll(timeout, unit);
+    return el == null ? null : el.element;
+  }
+
+  @Override
+  public int remainingCapacity() {
+    return queue.remainingCapacity();
+  }
+
+  @Override
+  public boolean remove(Object o) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public boolean contains(Object o) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public int drainTo(Collection<? super E> c) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+
+  @Override
+  public int drainTo(Collection<? super E> c, int maxElements) {
+    throw new UnsupportedOperationException(); // not implemented yet
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
index 90e07b3..6d64bc6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
@@ -74,14 +74,17 @@ public class LoadTestTool extends AbstractHBaseTool {
   /** Table name for the test */
   private TableName tableName;
 
+  /** Column families for the test */
+  private byte[][] families;
+
   /** Table name to use of not overridden on the command line */
   protected static final String DEFAULT_TABLE_NAME = "cluster_test";
 
   /** Column family used by the test */
-  public static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf");
+  public static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
 
   /** Column families used by the test */
-  protected static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY };
+  public static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
 
   /** The default data size if not specified */
   protected static final int DEFAULT_DATA_SIZE = 64;
@@ -130,18 +133,25 @@ public class LoadTestTool extends AbstractHBaseTool {
   public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
       + " Any args for this class can be passed as colon separated after class name";
 
+  public static final String OPT_WRITER = "writer";
+  public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
+
+  public static final String OPT_UPDATER = "updater";
+  public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
+
   public static final String OPT_READER = "reader";
   public static final String OPT_READER_USAGE = "The class for executing the read requests";
 
   protected static final String OPT_KEY_WINDOW = "key_window";
   protected static final String OPT_WRITE = "write";
   protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
-  protected static final String OPT_MULTIPUT = "multiput";
+  public static final String OPT_MULTIPUT = "multiput";
   public static final String OPT_MULTIGET = "multiget_batchsize";
   protected static final String OPT_NUM_KEYS = "num_keys";
   protected static final String OPT_READ = "read";
   protected static final String OPT_START_KEY = "start_key";
   public static final String OPT_TABLE_NAME = "tn";
+  public static final String OPT_COLUMN_FAMILIES = "families";
   protected static final String OPT_ZK_QUORUM = "zk";
   protected static final String OPT_ZK_PARENT_NODE = "zk_root";
   protected static final String OPT_SKIP_INIT = "skip_init";
@@ -245,6 +255,10 @@ public class LoadTestTool extends AbstractHBaseTool {
     return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
   }
 
+  public byte[][] getColumnFamilies() {
+    return families;
+  }
+
   /**
    * Apply column family options such as Bloom filters, compression, and data
    * block encoding.
@@ -298,6 +312,7 @@ public class LoadTestTool extends AbstractHBaseTool {
         "without port numbers");
     addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
     addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
+    addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
     addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
     addOptWithArg(OPT_READ, OPT_USAGE_READ);
     addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
@@ -320,6 +335,8 @@ public class LoadTestTool extends AbstractHBaseTool {
         "separate updates for every column in a row");
     addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
     addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
+    addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
+    addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
     addOptWithArg(OPT_READER, OPT_READER_USAGE);
 
     addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
@@ -352,6 +369,16 @@ public class LoadTestTool extends AbstractHBaseTool {
     tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
         DEFAULT_TABLE_NAME));
 
+    if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
+      String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
+      families = new byte[list.length][];
+      for (int i = 0; i < list.length; i++) {
+        families[i] = Bytes.toBytes(list[i]);
+      }
+    } else {
+      families = DEFAULT_COLUMN_FAMILIES;
+    }
+
     isWrite = cmd.hasOption(OPT_WRITE);
     isRead = cmd.hasOption(OPT_READ);
     isUpdate = cmd.hasOption(OPT_UPDATE);
@@ -503,9 +530,9 @@ public class LoadTestTool extends AbstractHBaseTool {
     }
 
     HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
-        COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
+      getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
         regionReplication, durability);
-    applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
+    applyColumnFamilyOptions(tableName, getColumnFamilies());
   }
 
   @Override
@@ -570,7 +597,7 @@ public class LoadTestTool extends AbstractHBaseTool {
     } else {
       // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
       dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
-          minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
+          minColsPerKey, maxColsPerKey, families);
     }
 
     if (userOwner != null) {
@@ -603,7 +630,14 @@ public class LoadTestTool extends AbstractHBaseTool {
       if (userOwner != null) {
         writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
       } else {
-        writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
+        String writerClass = null;
+        if (cmd.hasOption(OPT_WRITER)) {
+          writerClass = cmd.getOptionValue(OPT_WRITER);
+        } else {
+          writerClass = MultiThreadedWriter.class.getCanonicalName();
+        }
+
+        writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
       }
       writerThreads.setMultiPut(isMultiPut);
     }
@@ -613,7 +647,13 @@ public class LoadTestTool extends AbstractHBaseTool {
         updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
             userOwner, userNames);
       } else {
-        updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
+        String updaterClass = null;
+        if (cmd.hasOption(OPT_UPDATER)) {
+          updaterClass = cmd.getOptionValue(OPT_UPDATER);
+        } else {
+          updaterClass = MultiThreadedUpdater.class.getCanonicalName();
+        }
+        updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
       }
       updaterThreads.setBatchUpdate(isBatchUpdate);
       updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
@@ -700,7 +740,32 @@ public class LoadTestTool extends AbstractHBaseTool {
       Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
           byte[][].class);
       return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
-          minColsPerKey, maxColsPerKey, COLUMN_FAMILIES);
+          minColsPerKey, maxColsPerKey, families);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
+      , LoadTestDataGenerator dataGen) throws IOException {
+    try {
+      Class<?> clazz = Class.forName(clazzName);
+      Constructor<?> constructor = clazz.getConstructor(
+        LoadTestDataGenerator.class, Configuration.class, TableName.class);
+      return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
+      , LoadTestDataGenerator dataGen) throws IOException {
+    try {
+      Class<?> clazz = Class.forName(clazzName);
+      Constructor<?> constructor = clazz.getConstructor(
+        LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
+      return (MultiThreadedUpdater) constructor.newInstance(
+        dataGen, conf, tableName, updatePercent);
     } catch (Exception e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
index 9eb0c93..d4e6d80 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
@@ -46,7 +46,7 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
    * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
    * being inserted/updated. This queue is supposed to stay small.
    */
-  protected BlockingQueue<Long> wroteKeys = new ArrayBlockingQueue<Long>(10000);
+  protected BlockingQueue<Long> wroteKeys;
 
   /**
    * This is the current key to be inserted/updated by any thread. Each thread does an
@@ -75,6 +75,11 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
   public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
       TableName tableName, String actionLetter) throws IOException {
     super(dataGen, conf, tableName, actionLetter);
+    this.wroteKeys = createWriteKeysQueue(conf);
+  }
+
+  protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
+    return new ArrayBlockingQueue<Long>(10000);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
index b0a17a9..6beb2e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -81,7 +80,8 @@ public class RestartMetaTest extends AbstractHBaseTool {
 
     // start the writers
     LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
-      minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY);
+      minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey,
+      LoadTestTool.DEFAULT_COLUMN_FAMILY);
     MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
     writer.setMultiPut(true);
     writer.start(startKey, endKey, numThreads);
@@ -101,7 +101,7 @@ public class RestartMetaTest extends AbstractHBaseTool {
 
     // create tables if needed
     HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME,
-        LoadTestTool.COLUMN_FAMILY, Compression.Algorithm.NONE,
+        LoadTestTool.DEFAULT_COLUMN_FAMILY, Compression.Algorithm.NONE,
         DataBlockEncoding.NONE);
 
     LOG.debug("Loading data....\n\n");