You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/02/23 06:29:25 UTC
hbase git commit: HBASE-11842 Integration test for async wal
replication to secondary regions
Repository: hbase
Updated Branches:
refs/heads/master 7792dee0c -> 21b366afe
HBASE-11842 Integration test for async wal replication to secondary regions
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/21b366af
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/21b366af
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/21b366af
Branch: refs/heads/master
Commit: 21b366afe1fa73cfee5601db8c661049788e97d7
Parents: 7792dee
Author: Enis Soztutar <en...@apache.org>
Authored: Sun Feb 22 21:29:12 2015 -0800
Committer: Enis Soztutar <en...@apache.org>
Committed: Sun Feb 22 21:29:12 2015 -0800
----------------------------------------------------------------------
.../hadoop/hbase/IntegrationTestIngest.java | 35 ++-
.../IntegrationTestIngestStripeCompactions.java | 11 +-
...IntegrationTestRegionReplicaReplication.java | 231 +++++++++++++++++++
.../hbase/regionserver/MemStoreFlusher.java | 18 +-
.../replication/regionserver/MetricsSource.java | 2 +-
.../RegionReplicaReplicationEndpoint.java | 88 ++++---
.../regionserver/ReplicationSource.java | 5 +-
.../hadoop/hbase/util/CompressionTest.java | 5 +-
.../hadoop/hbase/HBaseTestingUtility.java | 40 +++-
...egionReplicaReplicationEndpointNoMaster.java | 1 +
.../hadoop/hbase/util/ConstantDelayQueue.java | 196 ++++++++++++++++
.../apache/hadoop/hbase/util/LoadTestTool.java | 83 ++++++-
.../hbase/util/MultiThreadedWriterBase.java | 7 +-
.../hadoop/hbase/util/RestartMetaTest.java | 6 +-
14 files changed, 674 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
index c0c54b7..8495889 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
@@ -45,8 +46,8 @@ import com.google.common.collect.Sets;
public class IntegrationTestIngest extends IntegrationTestBase {
public static final char HIPHEN = '-';
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
- private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
- private static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
+ protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
+ protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
/** A soft limit on how long we should run */
protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
@@ -66,6 +67,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
protected LoadTestTool loadTool;
protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
+ LoadTestTool.OPT_COLUMN_FAMILIES,
LoadTestTool.OPT_COMPRESSION,
LoadTestTool.OPT_DATA_BLOCK_ENCODING,
LoadTestTool.OPT_INMEMORY,
@@ -78,7 +80,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
public void setUpCluster() throws Exception {
util = getTestingUtil(getConf());
LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
- util.initializeCluster(SERVER_COUNT);
+ util.initializeCluster(getMinServerCount());
LOG.debug("Done initializing/checking cluster");
cluster = util.getHBaseClusterInterface();
deleteTableIfNecessary();
@@ -89,6 +91,10 @@ public class IntegrationTestIngest extends IntegrationTestBase {
initTable();
}
+ protected int getMinServerCount() {
+ return SERVER_COUNT;
+ }
+
protected void initTable() throws IOException {
int ret = loadTool.run(getArgsForLoadTestToolInitTable());
Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
@@ -125,7 +131,22 @@ public class IntegrationTestIngest extends IntegrationTestBase {
@Override
protected Set<String> getColumnFamilies() {
- return Sets.newHashSet(Bytes.toString(LoadTestTool.COLUMN_FAMILY));
+ Set<String> families = Sets.newHashSet();
+ String clazz = this.getClass().getSimpleName();
+ // parse conf for getting the column famly names because LTT is not initialized yet.
+ String familiesString = getConf().get(
+ String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
+ if (familiesString == null) {
+ for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) {
+ families.add(Bytes.toString(family));
+ }
+ } else {
+ for (String family : familiesString.split(",")) {
+ families.add(family);
+ }
+ }
+
+ return families;
}
private void deleteTableIfNecessary() throws IOException {
@@ -206,6 +227,8 @@ public class IntegrationTestIngest extends IntegrationTestBase {
List<String> args = new ArrayList<String>();
args.add("-tn");
args.add(getTablename().getNameAsString());
+ args.add("-families");
+ args.add(getColumnFamiliesAsString());
args.add(mode);
args.add(modeSpecificArg);
args.add("-start_key");
@@ -217,6 +240,10 @@ public class IntegrationTestIngest extends IntegrationTestBase {
return args.toArray(new String[args.size()]);
}
+ private String getColumnFamiliesAsString() {
+ return StringUtils.join(",", getColumnFamilies());
+ }
+
/** Estimates a data size based on the cluster size */
protected long getNumKeys(long keysPerServer)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java
index ebf159e..d64fbb0 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.experimental.categories.Category;
/**
@@ -39,7 +41,14 @@ public class IntegrationTestIngestStripeCompactions extends IntegrationTestInges
HTableDescriptor htd = new HTableDescriptor(getTablename());
htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100");
- HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.COLUMN_FAMILY);
+ HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.DEFAULT_COLUMN_FAMILY);
HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd);
}
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int ret = ToolRunner.run(conf, new IntegrationTestIngestStripeCompactions(), args);
+ System.exit(ret);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
new file mode 100644
index 0000000..30da5c0
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.util.ConstantDelayQueue;
+import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.hbase.util.MultiThreadedUpdater;
+import org.apache.hadoop.hbase.util.MultiThreadedWriter;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration test for testing async wal replication to secondary region replicas. Sets up a table
+ * with given region replication (default 2), and uses LoadTestTool client writer, updater and
+ * reader threads for writes and reads and verification. It uses a delay queue with a given delay
+ * ("read_delay_ms", default 5000ms) between the writer/updater and reader threads to make the
+ * written items available to readers. This means that a reader will only start reading from a row
+ * written by the writer / updater after 5secs has passed. The reader thread performs the reads from
+ * the given region replica id (default 1) to perform the reads. Async wal replication has to finish
+ * with the replication of the edits before read_delay_ms to the given region replica id so that
+ * the read and verify will not fail.
+ *
+ * The job will run for <b>at least<b> given runtime (default 10min) by running a concurrent
+ * writer and reader workload followed by a concurrent updater and reader workload for
+ * num_keys_per_server.
+ *<p>
+ * Example usage:
+ * <pre>
+ * hbase org.apache.hadoop.hbase.IntegrationTestRegionReplicaReplication
+ * -DIntegrationTestRegionReplicaReplication.num_keys_per_server=10000
+ * -Dhbase.IntegrationTestRegionReplicaReplication.runtime=600000
+ * -DIntegrationTestRegionReplicaReplication.read_delay_ms=5000
+ * -DIntegrationTestRegionReplicaReplication.region_replication=3
+ * -DIntegrationTestRegionReplicaReplication.region_replica_id=2
+ * -DIntegrationTestRegionReplicaReplication.num_read_threads=100
+ * -DIntegrationTestRegionReplicaReplication.num_write_threads=100
+ * </pre>
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest {
+
+ private static final String TEST_NAME
+ = IntegrationTestRegionReplicaReplication.class.getSimpleName();
+
+ private static final String OPT_READ_DELAY_MS = "read_delay_ms";
+
+ private static final int DEFAULT_REGION_REPLICATION = 2;
+ private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
+ private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] {"f1", "f2", "f3"};
+
+ @Override
+ protected int getMinServerCount() {
+ return SERVER_COUNT;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ conf.setIfUnset(
+ String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION),
+ String.valueOf(DEFAULT_REGION_REPLICATION));
+
+ conf.setIfUnset(
+ String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES),
+ StringUtils.join(",", DEFAULT_COLUMN_FAMILIES));
+
+ conf.setBoolean("hbase.table.sanity.checks", true);
+
+ // enable async wal replication to region replicas for unit tests
+ conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+
+ conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB
+ conf.setInt("hbase.hstore.blockingStoreFiles", 100);
+
+ super.setConf(conf);
+ }
+
+ @Override
+ @Test
+ public void testIngest() throws Exception {
+ runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
+ }
+
+ @Override
+ protected void startMonkey() throws Exception {
+ // TODO: disabled for now
+ }
+
+ /**
+ * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
+ * threads to become available to the MultiThradedReader threads. We add this delay because of
+ * the async nature of the wal replication to region replicas.
+ */
+ public static class DelayingMultiThreadedWriter extends MultiThreadedWriter {
+ private long delayMs;
+ public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
+ TableName tableName) throws IOException {
+ super(dataGen, conf, tableName);
+ }
+ @Override
+ protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
+ this.delayMs = conf.getLong(String.format("%s.%s",
+ IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
+ return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
+ }
+ }
+
+ /**
+ * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
+ * threads to become available to the MultiThradedReader threads. We add this delay because of
+ * the async nature of the wal replication to region replicas.
+ */
+ public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater {
+ private long delayMs;
+ public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
+ TableName tableName, double updatePercent) throws IOException {
+ super(dataGen, conf, tableName, updatePercent);
+ }
+ @Override
+ protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
+ this.delayMs = conf.getLong(String.format("%s.%s",
+ IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
+ return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
+ }
+ }
+
+ @Override
+ protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
+ int recordSize, int writeThreads, int readThreads) throws Exception {
+
+ LOG.info("Running ingest");
+ LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
+
+ // sleep for some time so that the cache for disabled tables does not interfere.
+ Threads.sleep(
+ getConf().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
+ 5000) + 1000);
+
+ long start = System.currentTimeMillis();
+ String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
+ long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
+ long startKey = 0;
+
+ long numKeys = getNumKeys(keysPerServerPerIter);
+ while (System.currentTimeMillis() - start < 0.9 * runtime) {
+ LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
+ ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
+
+ int verifyPercent = 100;
+ int updatePercent = 20;
+ int ret = -1;
+ int regionReplicaId = conf.getInt(String.format("%s.%s"
+ , TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1);
+
+ // we will run writers and readers at the same time.
+ List<String> args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
+ args.add("-write");
+ args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads));
+ args.add("-" + LoadTestTool.OPT_MULTIPUT);
+ args.add("-writer");
+ args.add(DelayingMultiThreadedWriter.class.getName()); // inject writer class
+ args.add("-read");
+ args.add(String.format("%d:%d", verifyPercent, readThreads));
+ args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
+ args.add(String.valueOf(regionReplicaId));
+
+ ret = loadTool.run(args.toArray(new String[args.size()]));
+ if (0 != ret) {
+ String errorMsg = "Load failed with error code " + ret;
+ LOG.error(errorMsg);
+ Assert.fail(errorMsg);
+ }
+
+ args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
+ args.add("-update");
+ args.add(String.format("%s:%s:1", updatePercent, writeThreads));
+ args.add("-updater");
+ args.add(DelayingMultiThreadedUpdater.class.getName()); // inject updater class
+ args.add("-read");
+ args.add(String.format("%d:%d", verifyPercent, readThreads));
+ args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
+ args.add(String.valueOf(regionReplicaId));
+
+ ret = loadTool.run(args.toArray(new String[args.size()]));
+ if (0 != ret) {
+ String errorMsg = "Load failed with error code " + ret;
+ LOG.error(errorMsg);
+ Assert.fail(errorMsg);
+ }
+ startKey += numKeys;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args);
+ System.exit(ret);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index e5ad590..6268b78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.util.StringUtils.humanReadableInt;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
@@ -105,9 +106,9 @@ class MemStoreFlusher implements FlushRequester {
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
- this.globalMemStoreLimitLowMarkPercent =
+ this.globalMemStoreLimitLowMarkPercent =
HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
- this.globalMemStoreLimitLowMark =
+ this.globalMemStoreLimitLowMark =
(long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
@@ -178,7 +179,11 @@ class MemStoreFlusher implements FlushRequester {
Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
- LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
+ LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
+ + "Total Memstore size="
+ + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
+ + ", Region memstore size="
+ + humanReadableInt(regionToFlush.memstoreSize.get()));
flushedOne = flushRegion(regionToFlush, true, true);
if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
@@ -292,6 +297,7 @@ class MemStoreFlusher implements FlushRequester {
getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
}
+ @Override
public void requestFlush(HRegion r, boolean forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
@@ -304,6 +310,7 @@ class MemStoreFlusher implements FlushRequester {
}
}
+ @Override
public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
@@ -591,6 +598,7 @@ class MemStoreFlusher implements FlushRequester {
* Register a MemstoreFlushListener
* @param listener
*/
+ @Override
public void registerFlushRequestListener(final FlushRequestListener listener) {
this.flushRequestListeners.add(listener);
}
@@ -600,6 +608,7 @@ class MemStoreFlusher implements FlushRequester {
* @param listener
* @return true when passed listener is unregistered successfully.
*/
+ @Override
public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
return this.flushRequestListeners.remove(listener);
}
@@ -608,9 +617,10 @@ class MemStoreFlusher implements FlushRequester {
* Sets the global memstore limit to a new size.
* @param globalMemStoreSize
*/
+ @Override
public void setGlobalMemstoreLimit(long globalMemStoreSize) {
this.globalMemStoreLimit = globalMemStoreSize;
- this.globalMemStoreLimitLowMark =
+ this.globalMemStoreLimitLowMark =
(long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
reclaimMemStoreMemory();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 21296a0..04c3d2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -107,7 +107,7 @@ public class MetricsSource {
*
* @param delta the number filtered.
*/
- private void incrLogEditsFiltered(long delta) {
+ public void incrLogEditsFiltered(long delta) {
singleSourceSource.incrLogEditsFiltered(delta);
globalSourceSource.incrLogEditsFiltered(delta);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index fc19603..b38a0e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -79,8 +79,8 @@ import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ServiceException;
/**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
- * which receives the WAL edits from the WAL, and sends the edits to replicas
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
+ * which receives the WAL edits from the WAL, and sends the edits to replicas
* of regions.
*/
@InterfaceAudience.Private
@@ -232,6 +232,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
entryBuffers.appendEntry(entry);
}
outputSink.flush(); // make sure everything is flushed
+ ctx.getMetrics().incrLogEditsFiltered(
+ outputSink.getSkippedEditsCounter().getAndSet(0));
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -341,24 +343,58 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
List<Entry> entries) throws IOException {
if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
- sink.getSkippedEditsCounter().incrementAndGet();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
+ + " is cached as a disabled or dropped table");
+ }
+ sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
}
- // get the replicas of the primary region
+ // If the table is disabled or dropped, we should not replay the entries, and we can skip
+ // replaying them. However, we might not know whether the table is disabled until we
+ // invalidate the cache and check from meta
RegionLocations locations = null;
- try {
- locations = getRegionLocations(connection, tableName, row, true, 0);
+ boolean useCache = true;
+ while (true) {
+ // get the replicas of the primary region
+ try {
+ locations = getRegionLocations(connection, tableName, row, useCache, 0);
- if (locations == null) {
- throw new HBaseIOException("Cannot locate locations for "
- + tableName + ", row:" + Bytes.toStringBinary(row));
+ if (locations == null) {
+ throw new HBaseIOException("Cannot locate locations for "
+ + tableName + ", row:" + Bytes.toStringBinary(row));
+ }
+ } catch (TableNotFoundException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
+ + " is dropped. Adding table to cache.");
+ }
+ disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
+ // skip this entry
+ sink.getSkippedEditsCounter().addAndGet(entries.size());
+ return;
}
- } catch (TableNotFoundException e) {
- disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
- // skip this entry
- sink.getSkippedEditsCounter().addAndGet(entries.size());
- return;
+
+ // check whether we should still replay this entry. If the regions are changed, or the
+ // entry is not coming from the primary region, filter it out.
+ HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
+ if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
+ encodedRegionName)) {
+ if (useCache) {
+ useCache = false;
+ continue; // this will retry location lookup
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ + " because located region region " + primaryLocation.getRegionInfo().getEncodedName()
+ + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
+ + " from WALEdit");
+ }
+ sink.getSkippedEditsCounter().addAndGet(entries.size());
+ return;
+ }
+ break;
}
if (locations.size() == 1) {
@@ -366,17 +402,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
}
ArrayList<Future<ReplicateWALEntryResponse>> tasks
- = new ArrayList<Future<ReplicateWALEntryResponse>>(2);
-
- // check whether we should still replay this entry. If the regions are changed, or the
- // entry is not coming form the primary region, filter it out.
- HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
- if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
- encodedRegionName)) {
- sink.getSkippedEditsCounter().addAndGet(entries.size());
- return;
- }
-
+ = new ArrayList<Future<ReplicateWALEntryResponse>>(locations.size() - 1);
// All passed entries should belong to one region because it is coming from the EntryBuffers
// split per region. But the regions might split and merge (unlike log recovery case).
@@ -413,6 +439,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
// check whether the table is dropped or disabled which might cause
// SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ + " because received exception for dropped or disabled table", cause);
+ }
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
if (!tasksCancelled) {
sink.getSkippedEditsCounter().addAndGet(entries.size());
@@ -490,6 +520,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
initialEncodedRegionName)) {
skip = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ + " because located region region " + location.getRegionInfo().getEncodedName()
+ + " is different than the original region "
+ + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
+ }
return null;
}
@@ -504,7 +540,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
throws IOException {
if (entries.isEmpty() || skip) {
- skippedEntries.incrementAndGet();
+ skippedEntries.addAndGet(entries.size());
return ReplicateWALEntryResponse.newBuilder().build();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 714080f..794a3e1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -691,8 +691,10 @@ public class ReplicationSource extends Thread
}
replicateContext.setEntries(entries).setSize(currentSize);
+ long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = replicationEndpoint.replicate(replicateContext);
+ long endTimeNs = System.nanoTime();
if (!replicated) {
continue;
@@ -713,7 +715,8 @@ public class ReplicationSource extends Thread
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
- + this.totalReplicatedOperations + " operations");
+ + this.totalReplicatedOperations + " operations in " +
+ ((endTimeNs - startTimeNs)/1000000) + " ms");
}
break;
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
index 5ec13f4..cdef12f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -87,7 +88,7 @@ public class CompressionTest {
return ; // already passed test, dont do it again.
} else {
// failed.
- throw new IOException("Compression algorithm '" + algo.getName() + "'" +
+ throw new DoNotRetryIOException("Compression algorithm '" + algo.getName() + "'" +
" previously failed test.");
}
}
@@ -98,7 +99,7 @@ public class CompressionTest {
compressionTestResults[algo.ordinal()] = true; // passes
} catch (Throwable t) {
compressionTestResults[algo.ordinal()] = false; // failure
- throw new IOException(t);
+ throw new DoNotRetryIOException(t);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 1a377fc..c1897cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -3577,6 +3577,29 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* @return the number of regions the table was split into
*/
public static int createPreSplitLoadTestTable(Configuration conf,
+ TableName tableName, byte[][] columnFamilies, Algorithm compression,
+ DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
+ Durability durability)
+ throws IOException {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.setDurability(durability);
+ desc.setRegionReplication(regionReplication);
+ HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
+ for (int i = 0; i < columnFamilies.length; i++) {
+ HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
+ hcd.setDataBlockEncoding(dataBlockEncoding);
+ hcd.setCompressionType(compression);
+ hcds[i] = hcd;
+ }
+ return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
+ }
+
+ /**
+ * Creates a pre-split table for load testing. If the table already exists,
+ * logs a warning and continues.
+ * @return the number of regions the table was split into
+ */
+ public static int createPreSplitLoadTestTable(Configuration conf,
HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
}
@@ -3588,8 +3611,21 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
*/
public static int createPreSplitLoadTestTable(Configuration conf,
HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException {
- if (!desc.hasFamily(hcd.getName())) {
- desc.addFamily(hcd);
+ return createPreSplitLoadTestTable(conf, desc, new HColumnDescriptor[] {hcd},
+ numRegionsPerServer);
+ }
+
+ /**
+ * Creates a pre-split table for load testing. If the table already exists,
+ * logs a warning and continues.
+ * @return the number of regions the table was split into
+ */
+ public static int createPreSplitLoadTestTable(Configuration conf,
+ HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException {
+ for (HColumnDescriptor hcd : hcds) {
+ if (!desc.hasFamily(hcd.getName())) {
+ desc.addFamily(hcd);
+ }
}
int totalNumberOfRegions = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index a191bdd..2326301 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -242,6 +242,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
+ when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
replicator.init(context);
replicator.start();
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java
new file mode 100644
index 0000000..73ce71a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A blocking queue implementation for adding a constant delay. Uses a DelayQueue as a backing store
+ * @param <E> type of elements
+ */
+@InterfaceAudience.Private
+public class ConstantDelayQueue<E> implements BlockingQueue<E> {
+
+ private static final class DelayedElement<T> implements Delayed {
+ T element;
+ long end;
+ public DelayedElement(T element, long delayMs) {
+ this.element = element;
+ this.end = EnvironmentEdgeManager.currentTime() + delayMs;
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
+ return cmp == 0 ? 0 : ( cmp < 0 ? -1 : 1);
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private final long delayMs;
+
+ // backing DelayQueue
+ private DelayQueue<DelayedElement<E>> queue = new DelayQueue<DelayedElement<E>>();
+
+ public ConstantDelayQueue(TimeUnit timeUnit, long delay) {
+ this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit);
+ }
+
+ @Override
+ public E remove() {
+ DelayedElement<E> el = queue.remove();
+ return el == null ? null : el.element;
+ }
+
+ @Override
+ public E poll() {
+ DelayedElement<E> el = queue.poll();
+ return el == null ? null : el.element;
+ }
+
+ @Override
+ public E element() {
+ DelayedElement<E> el = queue.element();
+ return el == null ? null : el.element;
+ }
+
+ @Override
+ public E peek() {
+ DelayedElement<E> el = queue.peek();
+ return el == null ? null : el.element;
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends E> c) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public void clear() {
+ queue.clear();
+ }
+
+ @Override
+ public boolean add(E e) {
+ return queue.add(new DelayedElement<E>(e, delayMs));
+ }
+
+ @Override
+ public boolean offer(E e) {
+ return queue.offer(new DelayedElement<E>(e, delayMs));
+ }
+
+ @Override
+ public void put(E e) throws InterruptedException {
+ queue.put(new DelayedElement<E>(e, delayMs));
+ }
+
+ @Override
+ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+ return queue.offer(new DelayedElement<E>(e, delayMs), timeout, unit);
+ }
+
+ @Override
+ public E take() throws InterruptedException {
+ DelayedElement<E> el = queue.take();
+ return el == null ? null : el.element;
+ }
+
+ @Override
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ DelayedElement<E> el = queue.poll(timeout, unit);
+ return el == null ? null : el.element;
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return queue.remainingCapacity();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ throw new UnsupportedOperationException(); // not implemented yet
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
index 90e07b3..6d64bc6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
@@ -74,14 +74,17 @@ public class LoadTestTool extends AbstractHBaseTool {
/** Table name for the test */
private TableName tableName;
+ /** Column families for the test */
+ private byte[][] families;
+
/** Table name to use of not overridden on the command line */
protected static final String DEFAULT_TABLE_NAME = "cluster_test";
/** Column family used by the test */
- public static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf");
+ public static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
/** Column families used by the test */
- protected static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY };
+ public static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
/** The default data size if not specified */
protected static final int DEFAULT_DATA_SIZE = 64;
@@ -130,18 +133,25 @@ public class LoadTestTool extends AbstractHBaseTool {
public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
+ " Any args for this class can be passed as colon separated after class name";
+ public static final String OPT_WRITER = "writer";
+ public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
+
+ public static final String OPT_UPDATER = "updater";
+ public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
+
public static final String OPT_READER = "reader";
public static final String OPT_READER_USAGE = "The class for executing the read requests";
protected static final String OPT_KEY_WINDOW = "key_window";
protected static final String OPT_WRITE = "write";
protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
- protected static final String OPT_MULTIPUT = "multiput";
+ public static final String OPT_MULTIPUT = "multiput";
public static final String OPT_MULTIGET = "multiget_batchsize";
protected static final String OPT_NUM_KEYS = "num_keys";
protected static final String OPT_READ = "read";
protected static final String OPT_START_KEY = "start_key";
public static final String OPT_TABLE_NAME = "tn";
+ public static final String OPT_COLUMN_FAMILIES = "families";
protected static final String OPT_ZK_QUORUM = "zk";
protected static final String OPT_ZK_PARENT_NODE = "zk_root";
protected static final String OPT_SKIP_INIT = "skip_init";
@@ -245,6 +255,10 @@ public class LoadTestTool extends AbstractHBaseTool {
return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
}
+ public byte[][] getColumnFamilies() {
+ return families;
+ }
+
/**
* Apply column family options such as Bloom filters, compression, and data
* block encoding.
@@ -298,6 +312,7 @@ public class LoadTestTool extends AbstractHBaseTool {
"without port numbers");
addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
+ addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
addOptWithArg(OPT_READ, OPT_USAGE_READ);
addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
@@ -320,6 +335,8 @@ public class LoadTestTool extends AbstractHBaseTool {
"separate updates for every column in a row");
addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
+ addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
+ addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
addOptWithArg(OPT_READER, OPT_READER_USAGE);
addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
@@ -352,6 +369,16 @@ public class LoadTestTool extends AbstractHBaseTool {
tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
DEFAULT_TABLE_NAME));
+ if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
+ String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
+ families = new byte[list.length][];
+ for (int i = 0; i < list.length; i++) {
+ families[i] = Bytes.toBytes(list[i]);
+ }
+ } else {
+ families = DEFAULT_COLUMN_FAMILIES;
+ }
+
isWrite = cmd.hasOption(OPT_WRITE);
isRead = cmd.hasOption(OPT_READ);
isUpdate = cmd.hasOption(OPT_UPDATE);
@@ -503,9 +530,9 @@ public class LoadTestTool extends AbstractHBaseTool {
}
HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
- COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
+ getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
regionReplication, durability);
- applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
+ applyColumnFamilyOptions(tableName, getColumnFamilies());
}
@Override
@@ -570,7 +597,7 @@ public class LoadTestTool extends AbstractHBaseTool {
} else {
// Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
- minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
+ minColsPerKey, maxColsPerKey, families);
}
if (userOwner != null) {
@@ -603,7 +630,14 @@ public class LoadTestTool extends AbstractHBaseTool {
if (userOwner != null) {
writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
} else {
- writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
+ String writerClass = null;
+ if (cmd.hasOption(OPT_WRITER)) {
+ writerClass = cmd.getOptionValue(OPT_WRITER);
+ } else {
+ writerClass = MultiThreadedWriter.class.getCanonicalName();
+ }
+
+ writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
}
writerThreads.setMultiPut(isMultiPut);
}
@@ -613,7 +647,13 @@ public class LoadTestTool extends AbstractHBaseTool {
updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
userOwner, userNames);
} else {
- updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
+ String updaterClass = null;
+ if (cmd.hasOption(OPT_UPDATER)) {
+ updaterClass = cmd.getOptionValue(OPT_UPDATER);
+ } else {
+ updaterClass = MultiThreadedUpdater.class.getCanonicalName();
+ }
+ updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
}
updaterThreads.setBatchUpdate(isBatchUpdate);
updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
@@ -700,7 +740,32 @@ public class LoadTestTool extends AbstractHBaseTool {
Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
byte[][].class);
return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
- minColsPerKey, maxColsPerKey, COLUMN_FAMILIES);
+ minColsPerKey, maxColsPerKey, families);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
+ , LoadTestDataGenerator dataGen) throws IOException {
+ try {
+ Class<?> clazz = Class.forName(clazzName);
+ Constructor<?> constructor = clazz.getConstructor(
+ LoadTestDataGenerator.class, Configuration.class, TableName.class);
+ return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
+ , LoadTestDataGenerator dataGen) throws IOException {
+ try {
+ Class<?> clazz = Class.forName(clazzName);
+ Constructor<?> constructor = clazz.getConstructor(
+ LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
+ return (MultiThreadedUpdater) constructor.newInstance(
+ dataGen, conf, tableName, updatePercent);
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
index 9eb0c93..d4e6d80 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
@@ -46,7 +46,7 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
* {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
* being inserted/updated. This queue is supposed to stay small.
*/
- protected BlockingQueue<Long> wroteKeys = new ArrayBlockingQueue<Long>(10000);
+ protected BlockingQueue<Long> wroteKeys;
/**
* This is the current key to be inserted/updated by any thread. Each thread does an
@@ -75,6 +75,11 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, String actionLetter) throws IOException {
super(dataGen, conf, tableName, actionLetter);
+ this.wroteKeys = createWriteKeysQueue(conf);
+ }
+
+ protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
+ return new ArrayBlockingQueue<Long>(10000);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/21b366af/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
index b0a17a9..6beb2e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -81,7 +80,8 @@ public class RestartMetaTest extends AbstractHBaseTool {
// start the writers
LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
- minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY);
+ minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey,
+ LoadTestTool.DEFAULT_COLUMN_FAMILY);
MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
writer.setMultiPut(true);
writer.start(startKey, endKey, numThreads);
@@ -101,7 +101,7 @@ public class RestartMetaTest extends AbstractHBaseTool {
// create tables if needed
HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME,
- LoadTestTool.COLUMN_FAMILY, Compression.Algorithm.NONE,
+ LoadTestTool.DEFAULT_COLUMN_FAMILY, Compression.Algorithm.NONE,
DataBlockEncoding.NONE);
LOG.debug("Loading data....\n\n");