You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:31:11 UTC
[25/49] git commit: HBASE-10572 Create an IntegrationTest for region
replicas
HBASE-10572 Create an IntegrationTest for region replicas
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1576465 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/759cfb83
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/759cfb83
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/759cfb83
Branch: refs/heads/master
Commit: 759cfb83af84e97a6ad30aad811c03b42f99e233
Parents: 55a7c87
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Mar 11 18:37:14 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:38 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/IntegrationTestBase.java | 15 +-
.../hadoop/hbase/IntegrationTestIngest.java | 71 +++-
.../RestartRandomRsExceptMetaAction.java | 42 +++
.../actions/RollingBatchRestartRsAction.java | 50 ++-
.../RollingBatchRestartRsExceptMetaAction.java | 43 +++
.../hbase/chaos/factories/MonkeyFactory.java | 6 +-
.../factories/ServerKillingMonkeyFactory.java | 61 ++++
...stTimeBoundedRequestsWithRegionReplicas.java | 345 +++++++++++++++++++
.../hadoop/hbase/HBaseTestingUtility.java | 35 +-
.../apache/hadoop/hbase/util/LoadTestTool.java | 76 +++-
.../hadoop/hbase/util/MultiThreadedReader.java | 54 ++-
.../hbase/util/MultiThreadedReaderWithACL.java | 5 +-
12 files changed, 730 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java
index 77bdd68..85bc5db 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java
@@ -133,14 +133,22 @@ public abstract class IntegrationTestBase extends AbstractHBaseTool {
util = getTestingUtil(getConf());
MonkeyFactory fact = MonkeyFactory.getFactory(monkeyToUse);
if (fact == null) {
- // Run with no monkey in distributed context, with real monkey in local test context.
- fact = MonkeyFactory.getFactory(
- util.isDistributedCluster() ? MonkeyFactory.CALM : MonkeyFactory.SLOW_DETERMINISTIC);
+ fact = getDefaultMonkeyFactory();
}
monkey = fact.setUtil(util)
.setTableName(getTablename())
.setProperties(monkeyProps)
.setColumnFamilies(getColumnFamilies()).build();
+ startMonkey();
+ }
+
+ protected MonkeyFactory getDefaultMonkeyFactory() {
+ // Run with no monkey in distributed context, with real monkey in local test context.
+ return MonkeyFactory.getFactory(
+ util.isDistributedCluster() ? MonkeyFactory.CALM : MonkeyFactory.SLOW_DETERMINISTIC);
+ }
+
+ protected void startMonkey() throws Exception {
monkey.start();
}
@@ -159,6 +167,7 @@ public abstract class IntegrationTestBase extends AbstractHBaseTool {
if (this.util == null) {
if (conf == null) {
this.util = new IntegrationTestingUtility();
+ this.setConf(util.getConfiguration());
} else {
this.util = new IntegrationTestingUtility(conf);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
index 920a659..f71ff20 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java
@@ -42,21 +42,39 @@ import com.google.common.collect.Sets;
@Category(IntegrationTests.class)
public class IntegrationTestIngest extends IntegrationTestBase {
public static final char HIPHEN = '-';
- private static final int SERVER_COUNT = 4; // number of slaves for the smallest cluster
+ private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
private static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
/** A soft limit on how long we should run */
- private static final String RUN_TIME_KEY = "hbase.%s.runtime";
+ protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
+
+ protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
+ protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
+
+ protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
+ protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
+
+ protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
+ protected static final int DEFAULT_NUM_READ_THREADS = 20;
protected static final Log LOG = LogFactory.getLog(IntegrationTestIngest.class);
protected IntegrationTestingUtility util;
protected HBaseCluster cluster;
protected LoadTestTool loadTool;
+ protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
+ LoadTestTool.OPT_COMPRESSION,
+ LoadTestTool.OPT_DATA_BLOCK_ENCODING,
+ LoadTestTool.OPT_INMEMORY,
+ LoadTestTool.OPT_ENCRYPTION,
+ LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
+ LoadTestTool.OPT_REGION_REPLICATION,
+ };
+
@Override
public void setUpCluster() throws Exception {
- util = getTestingUtil(null);
+ util = getTestingUtil(getConf());
LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
util.initializeCluster(SERVER_COUNT);
LOG.debug("Done initializing/checking cluster");
@@ -70,7 +88,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
}
protected void initTable() throws IOException {
- int ret = loadTool.run(new String[] { "-tn", getTablename(), "-init_only" });
+ int ret = loadTool.run(getArgsForLoadTestToolInitTable());
Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
}
@@ -82,16 +100,24 @@ public class IntegrationTestIngest extends IntegrationTestBase {
@Test
public void testIngest() throws Exception {
- runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10);
+ runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
}
- private void internalRunIngestTest(long runTime) throws Exception {
- runIngestTest(runTime, 2500, 10, 1024, 10);
+ protected void internalRunIngestTest(long runTime) throws Exception {
+ String clazz = this.getClass().getSimpleName();
+ long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
+ DEFAULT_NUM_KEYS_PER_SERVER);
+ int numWriteThreads = conf.getInt(
+ String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
+ int numReadThreads = conf.getInt(
+ String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
+ runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
}
@Override
public String getTablename() {
- return this.getClass().getSimpleName();
+ String clazz = this.getClass().getSimpleName();
+ return conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz);
}
@Override
@@ -104,8 +130,10 @@ public class IntegrationTestIngest extends IntegrationTestBase {
util.deleteTable(Bytes.toBytes(getTablename()));
}
}
- protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter, int colsPerKey,
- int recordSize, int writeThreads) throws Exception {
+
+ protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
+ int recordSize, int writeThreads, int readThreads) throws Exception {
+
LOG.info("Running ingest");
LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
@@ -136,7 +164,8 @@ public class IntegrationTestIngest extends IntegrationTestBase {
Assert.fail(errorMsg);
}
- ret = loadTool.run(getArgsForLoadTestTool("-read", "100:20", startKey, numKeys));
+ ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
+ , startKey, numKeys));
if (0 != ret) {
String errorMsg = "Verification failed with error code " + ret;
LOG.error(errorMsg);
@@ -146,6 +175,23 @@ public class IntegrationTestIngest extends IntegrationTestBase {
}
}
+ protected String[] getArgsForLoadTestToolInitTable() {
+ List<String> args = new ArrayList<String>();
+ args.add("-tn");
+ args.add(getTablename());
+ // pass all remaining args from conf with keys <test class name>.<load test tool arg>
+ String clazz = this.getClass().getSimpleName();
+ for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
+ String val = conf.get(String.format("%s.%s", clazz, arg));
+ if (val != null) {
+ args.add("-" + arg);
+ args.add(val);
+ }
+ }
+ args.add("-init_only");
+ return args.toArray(new String[args.size()]);
+ }
+
protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
long numKeys) {
List<String> args = new ArrayList<String>();
@@ -158,11 +204,12 @@ public class IntegrationTestIngest extends IntegrationTestBase {
args.add("-num_keys");
args.add(String.valueOf(numKeys));
args.add("-skip_init");
+
return args.toArray(new String[args.size()]);
}
/** Estimates a data size based on the cluster size */
- private long getNumKeys(int keysPerServer)
+ protected long getNumKeys(long keysPerServer)
throws IOException {
int numRegionServers = cluster.getClusterStatus().getServersSize();
return keysPerServer * numRegionServers;
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RestartRandomRsExceptMetaAction.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RestartRandomRsExceptMetaAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RestartRandomRsExceptMetaAction.java
new file mode 100644
index 0000000..b78144a
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RestartRandomRsExceptMetaAction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos.actions;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+
+public class RestartRandomRsExceptMetaAction extends RestartRandomRsAction {
+ public RestartRandomRsExceptMetaAction(long sleepTime) {
+ super(sleepTime);
+ }
+
+ @Override
+ public void perform() throws Exception {
+ int tries = 10;
+
+ while (tries-- > 0 && getCurrentServers().length > 1) {
+ ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());
+ ServerName metaServer = cluster.getServerHoldingMeta();
+ if (server != null && !server.equals(metaServer)) {
+ restartRs(server, sleepTime);
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java
index 6df10cb..7530383 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java
@@ -32,37 +32,57 @@ import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
/**
* Restarts a ratio of the regionservers in a rolling fashion. At each step, either kills a
- * server, or starts one, sleeping randomly (0-sleepTime) in between steps.
+ * server, or starts one, sleeping randomly (0-sleepTime) in between steps. The parameter maxDeadServers
+ * limits the maximum number of servers that can be down at the same time during rolling restarts.
*/
public class RollingBatchRestartRsAction extends BatchRestartRsAction {
private static Log LOG = LogFactory.getLog(RollingBatchRestartRsAction.class);
+ protected int maxDeadServers; // number of maximum dead servers at any given time. Defaults to 5
public RollingBatchRestartRsAction(long sleepTime, float ratio) {
+ this(sleepTime, ratio, 5);
+ }
+
+ public RollingBatchRestartRsAction(long sleepTime, float ratio, int maxDeadServers) {
super(sleepTime, ratio);
+ this.maxDeadServers = maxDeadServers;
+ }
+
+ enum KillOrStart {
+ KILL,
+ START
}
@Override
public void perform() throws Exception {
LOG.info(String.format("Performing action: Rolling batch restarting %d%% of region servers",
(int)(ratio * 100)));
- List<ServerName> selectedServers = PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(),
- ratio);
+ List<ServerName> selectedServers = selectServers();
Queue<ServerName> serversToBeKilled = new LinkedList<ServerName>(selectedServers);
Queue<ServerName> deadServers = new LinkedList<ServerName>();
- //
+ // loop while there are servers to be killed or dead servers to be restarted
while (!serversToBeKilled.isEmpty() || !deadServers.isEmpty()) {
- boolean action = true; //action true = kill server, false = start server
+ KillOrStart action = KillOrStart.KILL;
- if (serversToBeKilled.isEmpty() || deadServers.isEmpty()) {
- action = deadServers.isEmpty();
+ if (serversToBeKilled.isEmpty()) { // no more servers to kill
+ action = KillOrStart.START;
+ } else if (deadServers.isEmpty()) {
+ action = KillOrStart.KILL; // no more servers to start
+ } else if (deadServers.size() >= maxDeadServers) {
+ // we have too many dead servers. Don't kill any more
+ action = KillOrStart.START;
} else {
- action = RandomUtils.nextBoolean();
+ // do a coin toss
+ action = RandomUtils.nextBoolean() ? KillOrStart.KILL : KillOrStart.START;
}
- if (action) {
- ServerName server = serversToBeKilled.remove();
+ ServerName server;
+
+ switch (action) {
+ case KILL:
+ server = serversToBeKilled.remove();
try {
killRs(server);
} catch (org.apache.hadoop.util.Shell.ExitCodeException e) {
@@ -71,21 +91,27 @@ public class RollingBatchRestartRsAction extends BatchRestartRsAction {
LOG.info("Problem killing but presume successful; code=" + e.getExitCode(), e);
}
deadServers.add(server);
- } else {
+ break;
+ case START:
try {
- ServerName server = deadServers.remove();
+ server = deadServers.remove();
startRs(server);
} catch (org.apache.hadoop.util.Shell.ExitCodeException e) {
// The start may fail but better to just keep going though we may lose server.
//
LOG.info("Problem starting, will retry; code=" + e.getExitCode(), e);
}
+ break;
}
sleep(RandomUtils.nextInt((int)sleepTime));
}
}
+ protected List<ServerName> selectServers() throws IOException {
+ return PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(), ratio);
+ }
+
/**
* Small test to ensure the class basically works.
* @param args
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsExceptMetaAction.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsExceptMetaAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsExceptMetaAction.java
new file mode 100644
index 0000000..f03b8ec
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsExceptMetaAction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos.actions;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Same as in {@link RollingBatchRestartRsAction} except that this action
+ * does not restart the region server holding the META table.
+ */
+public class RollingBatchRestartRsExceptMetaAction extends RollingBatchRestartRsAction {
+
+ public RollingBatchRestartRsExceptMetaAction(long sleepTime, float ratio, int maxDeadServers) {
+ super(sleepTime, ratio, maxDeadServers);
+ }
+
+ @Override
+ protected List<ServerName> selectServers() throws java.io.IOException {
+ ServerName metaServer = cluster.getServerHoldingMeta();
+ List<ServerName> servers = super.selectServers();
+ servers.remove(metaServer);
+ return servers;
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
index 4f3824b..8fb1859 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
@@ -22,10 +22,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey;
+import com.google.common.collect.ImmutableMap;
+
/**
* Base class of the factory that will create a ChaosMonkey.
*/
@@ -60,16 +61,17 @@ public abstract class MonkeyFactory {
public abstract ChaosMonkey build();
-
public static final String CALM = "calm";
// TODO: the name has become a misnomer since the default (not-slow) monkey has been removed
public static final String SLOW_DETERMINISTIC = "slowDeterministic";
public static final String UNBALANCE = "unbalance";
+ public static final String SERVER_KILLING = "serverKilling";
public static Map<String, MonkeyFactory> FACTORIES = ImmutableMap.<String,MonkeyFactory>builder()
.put(CALM, new CalmMonkeyFactory())
.put(SLOW_DETERMINISTIC, new SlowDeterministicMonkeyFactory())
.put(UNBALANCE, new UnbalanceMonkeyFactory())
+ .put(SERVER_KILLING, new ServerKillingMonkeyFactory())
.build();
public static MonkeyFactory getFactory(String factoryName) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/ServerKillingMonkeyFactory.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/ServerKillingMonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/ServerKillingMonkeyFactory.java
new file mode 100644
index 0000000..02b5914
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/ServerKillingMonkeyFactory.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos.factories;
+
+import org.apache.hadoop.hbase.chaos.actions.Action;
+import org.apache.hadoop.hbase.chaos.actions.DumpClusterStatusAction;
+import org.apache.hadoop.hbase.chaos.actions.ForceBalancerAction;
+import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction;
+import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
+import org.apache.hadoop.hbase.chaos.actions.RollingBatchRestartRsExceptMetaAction;
+import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+import org.apache.hadoop.hbase.chaos.policies.CompositeSequentialPolicy;
+import org.apache.hadoop.hbase.chaos.policies.DoActionsOncePolicy;
+import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+
+/**
+ * Creates ChaosMonkeys for doing server restart actions, but not
+ * flush / compact / snapshot kind of actions.
+ */
+public class ServerKillingMonkeyFactory extends MonkeyFactory {
+
+ @Override
+ public ChaosMonkey build() {
+
+ // Destructive actions to mess things around. Cannot run batch restart
+ Action[] actions1 = new Action[] {
+ new RestartRandomRsExceptMetaAction(60000),
+ new RestartActiveMasterAction(5000),
+ new RollingBatchRestartRsExceptMetaAction(5000, 1.0f, 2), //only allow 2 servers to be dead
+ new ForceBalancerAction()
+ };
+
+ // Action to log more info for debugging
+ Action[] actions2 = new Action[] {
+ new DumpClusterStatusAction()
+ };
+
+ return new PolicyBasedChaosMonkey(util,
+ new CompositeSequentialPolicy(
+ new DoActionsOncePolicy(60 * 1000, actions1),
+ new PeriodicRandomActionPolicy(60 * 1000, actions1)),
+ new PeriodicRandomActionPolicy(60 * 1000, actions2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
new file mode 100644
index 0000000..9825ea7
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestIngest;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.IntegrationTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
+import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
+import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.hbase.util.MultiThreadedReader;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Assert;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * An IntegrationTest for doing reads with a timeout, to a read-only table with region
+ * replicas. ChaosMonkey is run which kills the region servers and master, but ensures
+ * that meta region server is not killed, and at most 2 region servers are dead at any point
+ * in time. The expected behavior is that all reads with stale mode true will return
+ * before the timeout (5 sec by default). The test fails if the read requests does not finish
+ * in time.
+ *
+ * <p> This test uses LoadTestTool to read and write the data from a single client but
+ * multiple threads. The data is written first, then we allow the region replicas to catch
+ * up. Then we start the reader threads doing get requests with stale mode true. Chaos Monkey is
+ * started after some delay (20 sec by default) after the reader threads are started so that
+ * there is enough time to fully cache meta.
+ *
+ * These parameters (and some other parameters from LoadTestTool) can be used to
+ * control behavior, given values are default:
+ * <pre>
+ * -Dhbase.DIntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000
+ * </pre>
+ * Use this test with "serverKilling" ChaosMonkey. Sample usage:
+ * <pre>
+ * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas
+ * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40
+ * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling
+ * </pre>
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest {
+
+ private static final Log LOG = LogFactory.getLog(
+ IntegrationTestTimeBoundedRequestsWithRegionReplicas.class);
+
+ private static final String TEST_NAME
+ = IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName();
+
+ protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec
+ protected static final String GET_TIMEOUT_KEY = "get_timeout_ms";
+
+ protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec
+ protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay";
+
+ protected static final int DEFAULT_REGION_REPLICATION = 3;
+
+ @Override
+ protected void startMonkey() throws Exception {
+ // we do not want to start the monkey at the start of the test.
+ }
+
+ @Override
+ protected MonkeyFactory getDefaultMonkeyFactory() {
+ return MonkeyFactory.getFactory(MonkeyFactory.CALM);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ // default replication for this test is 3
+ String clazz = this.getClass().getSimpleName();
+ conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION),
+ Integer.toString(DEFAULT_REGION_REPLICATION));
+ }
+
+ protected void writeData(int colsPerKey, int recordSize, int writeThreads,
+ long startKey, long numKeys) throws IOException {
+ int ret = loadTool.run(getArgsForLoadTestTool("-write",
+ String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
+ if (0 != ret) {
+ String errorMsg = "Load failed with error code " + ret;
+ LOG.error(errorMsg);
+ Assert.fail(errorMsg);
+ }
+ }
+
+ @Override
+ protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
+ int recordSize, int writeThreads, int readThreads) throws Exception {
+ LOG.info("Cluster size:"+
+ util.getHBaseClusterInterface().getClusterStatus().getServersSize());
+
+ long start = System.currentTimeMillis();
+ String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
+ long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
+ long startKey = 0;
+
+ long numKeys = getNumKeys(keysPerServerPerIter);
+
+
+ // write data once
+ LOG.info("Writing some data to the table");
+ writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys);
+
+ // flush the table
+ LOG.info("Flushing the table");
+ HBaseAdmin admin = util.getHBaseAdmin();
+ admin.flush(getTablename());
+
+ // re-open the regions to make sure that the replicas are up to date
+ long refreshTime = conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
+ if (refreshTime > 0 && refreshTime <= 10000) {
+ LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated");
+ Threads.sleep(refreshTime);
+ } else {
+ LOG.info("Reopening the table");
+ admin.disableTable(getTablename());
+ admin.enableTable(getTablename());
+ }
+
+ // We should only start the ChaosMonkey after the readers are started and have cached
+ // all of the region locations. Because the meta is not replicated, the timebounded reads
+ // will timeout if meta server is killed.
+ // We will start the chaos monkey after 1 minute, and since the readers are reading random
+ // keys, it should be enough to cache every region entry.
+ long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY)
+ , DEFAUL_CHAOS_MONKEY_DELAY);
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ LOG.info(String.format("ChaosMonkey delay is : %d seconds. Will start %s " +
+ "ChaosMonkey after delay", chaosMonkeyDelay / 1000, monkeyToUse));
+ ScheduledFuture<?> result = executorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Starting ChaosMonkey");
+ monkey.start();
+ monkey.waitForStop();
+ } catch (Exception e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+
+ }
+ }, chaosMonkeyDelay, TimeUnit.MILLISECONDS);
+
+ // set the intended run time for the reader. The reader will do read requests
+ // to random keys for this amount of time.
+ long remainingTime = runtime - (System.currentTimeMillis() - start);
+ LOG.info("Reading random keys from the table for " + remainingTime/60000 + " min");
+ this.conf.setLong(
+ String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName())
+ , remainingTime); // load tool shares the same conf
+
+ // now start the readers which will run for configured run time
+ try {
+ int ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
+ , startKey, numKeys));
+ if (0 != ret) {
+ String errorMsg = "Verification failed with error code " + ret;
+ LOG.error(errorMsg);
+ Assert.fail(errorMsg);
+ }
+ } finally {
+ if (result != null) result.cancel(false);
+ monkey.stop("Stopping the test");
+ monkey.waitForStop();
+ executorService.shutdown();
+ }
+ }
+
+ @Override
+ protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
+ long numKeys) {
+ List<String> args = Lists.newArrayList(super.getArgsForLoadTestTool(
+ mode, modeSpecificArg, startKey, numKeys));
+ args.add("-reader");
+ args.add(TimeBoundedMultiThreadedReader.class.getName());
+ return args.toArray(new String[args.size()]);
+ }
+
+ public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader {
+ protected long timeoutNano;
+ protected AtomicLong timedOutReads = new AtomicLong();
+ protected long runTime;
+ protected Thread timeoutThread;
+
+ public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
+ TableName tableName, double verifyPercent) {
+ super(dataGen, conf, tableName, verifyPercent);
+ long timeoutMs = conf.getLong(
+ String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT);
+ timeoutNano = timeoutMs * 1000000;
+ LOG.info("Timeout for gets: " + timeoutMs);
+ String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
+ this.runTime = conf.getLong(runTimeKey, -1);
+ if (this.runTime <= 0) {
+ throw new IllegalArgumentException("Please configure " + runTimeKey);
+ }
+ }
+
+ @Override
+ public void waitForFinish() {
+ try {
+ this.timeoutThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ this.aborted = true;
+ super.waitForFinish();
+ }
+
+ @Override
+ protected String progressInfo() {
+ StringBuilder builder = new StringBuilder(super.progressInfo());
+ appendToStatus(builder, "get_timeouts", timedOutReads.get());
+ return builder.toString();
+ }
+
+ @Override
+ public void start(long startKey, long endKey, int numThreads) throws IOException {
+ super.start(startKey, endKey, numThreads);
+ this.timeoutThread = new TimeoutThread(this.runTime);
+ this.timeoutThread.start();
+ }
+
+ @Override
+ protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
+ return new TimeBoundedMultiThreadedReaderThread(readerId);
+ }
+
+ private class TimeoutThread extends Thread {
+ long timeout;
+ long reportInterval = 60000;
+ public TimeoutThread(long timeout) {
+ this.timeout = timeout;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ long rem = Math.min(timeout, reportInterval);
+ if (rem <= 0) {
+ break;
+ }
+ LOG.info("Remaining execution time:" + timeout / 60000 + " min");
+ Threads.sleep(rem);
+ timeout -= rem;
+ }
+ }
+ }
+
+ public class TimeBoundedMultiThreadedReaderThread
+ extends MultiThreadedReader.HBaseReaderThread {
+
+ public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException {
+ super(readerId);
+ }
+
+ @Override
+ protected Get createGet(long keyToRead) throws IOException {
+ Get get = super.createGet(keyToRead);
+ get.setConsistency(Consistency.TIMELINE);
+ return get;
+ }
+
+ @Override
+ protected long getNextKeyToRead() {
+ // always read a random key, assuming that the writer has finished writing all keys
+ long key = startKey + Math.abs(RandomUtils.nextLong())
+ % (endKey - startKey);
+ return key;
+ }
+
+ @Override
+ protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano,
+ Result result, HTable table, boolean isNullExpected) throws IOException {
+ super.verifyResultsAndUpdateMetrics(verify, rowKey, elapsedNano, result, table, isNullExpected);
+ // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC
+ // to complete, but if the request took longer than timeout, we treat that as error.
+ if (elapsedNano > timeoutNano) {
+ timedOutReads.incrementAndGet();
+ numReadFailures.addAndGet(1); // fail the test
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int ret = ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args);
+ System.exit(ret);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index e56b8d0..79bda27 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -957,7 +957,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
miniClusterRunning = false;
LOG.info("Minicluster is down");
}
-
+
/**
* @return True if we removed the test dirs
* @throws IOException
@@ -3191,11 +3191,27 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public static int createPreSplitLoadTestTable(Configuration conf,
TableName tableName, byte[] columnFamily, Algorithm compression,
DataBlockEncoding dataBlockEncoding) throws IOException {
+ return createPreSplitLoadTestTable(conf, tableName,
+ columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
+ Durability.USE_DEFAULT);
+ }
+ /**
+ * Creates a pre-split table for load testing. If the table already exists,
+ * logs a warning and continues.
+ * @return the number of regions the table was split into
+ */
+ public static int createPreSplitLoadTestTable(Configuration conf,
+ TableName tableName, byte[] columnFamily, Algorithm compression,
+ DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
+ Durability durability)
+ throws IOException {
HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.setDurability(durability);
+ desc.setRegionReplication(regionReplication);
HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
hcd.setDataBlockEncoding(dataBlockEncoding);
hcd.setCompressionType(compression);
- return createPreSplitLoadTestTable(conf, desc, hcd);
+ return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
}
/**
@@ -3205,6 +3221,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
*/
public static int createPreSplitLoadTestTable(Configuration conf,
HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
+ return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
+ }
+
+ /**
+ * Creates a pre-split table for load testing. If the table already exists,
+ * logs a warning and continues.
+ * @return the number of regions the table was split into
+ */
+ public static int createPreSplitLoadTestTable(Configuration conf,
+ HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException {
if (!desc.hasFamily(hcd.getName())) {
desc.addFamily(hcd);
}
@@ -3220,11 +3246,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
throw new IllegalStateException("No live regionservers");
}
- int regionsPerServer = conf.getInt(REGIONS_PER_SERVER_KEY, DEFAULT_REGIONS_PER_SERVER);
- totalNumberOfRegions = numberOfServers * regionsPerServer;
+ totalNumberOfRegions = numberOfServers * numRegionsPerServer;
LOG.info("Number of live regionservers: " + numberOfServers + ", " +
"pre-splitting table into " + totalNumberOfRegions + " regions " +
- "(default regions per server: " + regionsPerServer + ")");
+ "(regions per server: " + numRegionsPerServer + ")");
byte[][] splits = new RegionSplitter.HexStringSplit().split(
totalNumberOfRegions);
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
index a893317..34980c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
@@ -110,10 +110,11 @@ public class LoadTestTool extends AbstractHBaseTool {
+ "compression) to use for data blocks in the test column family, "
+ "one of " + Arrays.toString(DataBlockEncoding.values()) + ".";
- private static final String OPT_BLOOM = "bloom";
- private static final String OPT_COMPRESSION = "compression";
- private static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
+ public static final String OPT_BLOOM = "bloom";
+ public static final String OPT_COMPRESSION = "compression";
+ public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
+
public static final String OPT_DATA_BLOCK_ENCODING =
HColumnDescriptor.DATA_BLOCK_ENCODING.toLowerCase();
@@ -125,6 +126,9 @@ public class LoadTestTool extends AbstractHBaseTool {
public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
+ " Any args for this class can be passed as colon separated after class name";
+ public static final String OPT_READER = "reader";
+ public static final String OPT_READER_USAGE = "The class for executing the read requests";
+
protected static final String OPT_KEY_WINDOW = "key_window";
protected static final String OPT_WRITE = "write";
protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
@@ -132,7 +136,7 @@ public class LoadTestTool extends AbstractHBaseTool {
protected static final String OPT_NUM_KEYS = "num_keys";
protected static final String OPT_READ = "read";
protected static final String OPT_START_KEY = "start_key";
- protected static final String OPT_TABLE_NAME = "tn";
+ public static final String OPT_TABLE_NAME = "tn";
protected static final String OPT_ZK_QUORUM = "zk";
protected static final String OPT_ZK_PARENT_NODE = "zk_root";
protected static final String OPT_SKIP_INIT = "skip_init";
@@ -142,11 +146,20 @@ public class LoadTestTool extends AbstractHBaseTool {
protected static final String OPT_BATCHUPDATE = "batchupdate";
protected static final String OPT_UPDATE = "update";
- protected static final String OPT_ENCRYPTION = "encryption";
+ public static final String OPT_ENCRYPTION = "encryption";
protected static final String OPT_ENCRYPTION_USAGE =
"Enables transparent encryption on the test table, one of " +
Arrays.toString(Encryption.getSupportedCiphers());
+ public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
+ protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
+ = "Desired number of regions per region server. Defaults to 5.";
+ protected static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
+
+ public static final String OPT_REGION_REPLICATION = "region_replication";
+ protected static final String OPT_REGION_REPLICATION_USAGE =
+ "Desired number of replicas per region";
+
protected static final long DEFAULT_START_KEY = 0;
/** This will be removed as we factor out the dependency on command line */
@@ -195,6 +208,9 @@ public class LoadTestTool extends AbstractHBaseTool {
//This file is used to read authentication information in secure clusters.
private String authnFileName;
+ private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
+ private int regionReplication = -1; // not set
+
// TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
// console tool itself should only be used from console.
protected boolean isSkipInit = false;
@@ -292,6 +308,7 @@ public class LoadTestTool extends AbstractHBaseTool {
"separate updates for every column in a row");
addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
+ addOptWithArg(OPT_READER, OPT_READER_USAGE);
addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
addOptWithArg(OPT_START_KEY, "The first key to read/write " +
@@ -311,6 +328,8 @@ public class LoadTestTool extends AbstractHBaseTool {
addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
+ addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
+ addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
}
@Override
@@ -421,13 +440,16 @@ public class LoadTestTool extends AbstractHBaseTool {
if (cmd.hasOption(NUM_TABLES)) {
numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
}
- regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER;
- if (cmd.hasOption(OPT_REGIONS_PER_SERVER)) {
- regionsPerServer = parseInt(cmd.getOptionValue(OPT_REGIONS_PER_SERVER), 1,
- Integer.MAX_VALUE);
- conf.setInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, regionsPerServer);
+
+ numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
+ if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
+ numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
+ }
+
+ regionReplication = 1;
+ if (cmd.hasOption(OPT_REGION_REPLICATION)) {
+ regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
}
- System.out.println("Regions per server: " + regionsPerServer);
}
private void parseColumnFamilyOptions(CommandLine cmd) {
@@ -451,14 +473,14 @@ public class LoadTestTool extends AbstractHBaseTool {
}
public void initTestTable() throws IOException {
- HTableDescriptor desc = new HTableDescriptor(tableName);
+ Durability durability = Durability.USE_DEFAULT;
if (deferredLogFlush) {
- desc.setDurability(Durability.ASYNC_WAL);
+ durability = Durability.ASYNC_WAL;
}
- HColumnDescriptor hcd = new HColumnDescriptor(COLUMN_FAMILY);
- hcd.setDataBlockEncoding(dataBlockEncodingAlgo);
- hcd.setCompressionType(compressAlgo);
- HBaseTestingUtility.createPreSplitLoadTestTable(conf, desc, hcd);
+
+ HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
+ COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
+ regionReplication, durability);
applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
}
@@ -588,7 +610,13 @@ public class LoadTestTool extends AbstractHBaseTool {
readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
userNames);
} else {
- readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
+ String readerClass = null;
+ if (cmd.hasOption(OPT_READER)) {
+ readerClass = cmd.getOptionValue(OPT_READER);
+ } else {
+ readerClass = MultiThreadedReader.class.getCanonicalName();
+ }
+ readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
}
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
@@ -662,6 +690,18 @@ public class LoadTestTool extends AbstractHBaseTool {
}
}
+ private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
+ , LoadTestDataGenerator dataGen) throws IOException {
+ try {
+ Class<?> clazz = Class.forName(clazzName);
+ Constructor<?> constructor = clazz.getConstructor(
+ LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
+ return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
public static byte[] generateData(final Random r, int length) {
byte [] b = new byte [length];
int i = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
index 0edeea7..b0d44fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
@@ -39,7 +39,7 @@ public class MultiThreadedReader extends MultiThreadedAction
protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
private final double verifyPercent;
- private volatile boolean aborted;
+ protected volatile boolean aborted;
protected MultiThreadedWriterBase writer = null;
@@ -104,11 +104,15 @@ public class MultiThreadedReader extends MultiThreadedAction
protected void addReaderThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
- HBaseReaderThread reader = new HBaseReaderThread(i);
+ HBaseReaderThread reader = createReaderThread(i);
readers.add(reader);
}
}
+ protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
+ return new HBaseReaderThread(readerId);
+ }
+
public class HBaseReaderThread extends Thread {
protected final int readerId;
protected final HTable table;
@@ -122,6 +126,8 @@ public class MultiThreadedReader extends MultiThreadedAction
/** If we are ahead of the writer and reading a random key. */
private boolean readingRandomKey;
+ private boolean printExceptionTrace = true;
+
/**
* @param readerId only the keys with this remainder from division by
* {@link #numThreads} will be read by this thread
@@ -204,7 +210,7 @@ public class MultiThreadedReader extends MultiThreadedAction
return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
}
- private long getNextKeyToRead() {
+ protected long getNextKeyToRead() {
readingRandomKey = false;
if (writer == null || curKey <= maxKeyWeCanRead()) {
return curKey++;
@@ -235,6 +241,24 @@ public class MultiThreadedReader extends MultiThreadedAction
}
private Get readKey(long keyToRead) {
+ Get get = null;
+ try {
+ get = createGet(keyToRead);
+ queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead);
+ } catch (IOException e) {
+ numReadFailures.addAndGet(1);
+ LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
+ + ", time from start: "
+ + (System.currentTimeMillis() - startTimeMs) + " ms");
+ if (printExceptionTrace) {
+ LOG.warn(e);
+ printExceptionTrace = false;
+ }
+ }
+ return get;
+ }
+
+ protected Get createGet(long keyToRead) throws IOException {
Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
String cfsString = "";
byte[][] columnFamilies = dataGenerator.getColumnFamilies();
@@ -247,18 +271,9 @@ public class MultiThreadedReader extends MultiThreadedAction
cfsString += "[" + Bytes.toStringBinary(cf) + "]";
}
}
-
- try {
- get = dataGenerator.beforeGet(keyToRead, get);
- if (verbose) {
- LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
- }
- queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead);
- } catch (IOException e) {
- numReadFailures.addAndGet(1);
- LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
- + ", time from start: "
- + (System.currentTimeMillis() - startTimeMs) + " ms");
+ get = dataGenerator.beforeGet(keyToRead, get);
+ if (verbose) {
+ LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
return get;
}
@@ -267,15 +282,16 @@ public class MultiThreadedReader extends MultiThreadedAction
String rowKey = Bytes.toString(get.getRow());
// read the data
- long start = System.currentTimeMillis();
+ long start = System.nanoTime();
Result result = table.get(get);
- getResultMetricUpdation(verify, rowKey, start, result, table, false);
+ long end = System.nanoTime();
+ verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, table, false);
}
- protected void getResultMetricUpdation(boolean verify, String rowKey, long start,
+ protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano,
Result result, HTable table, boolean isNullExpected)
throws IOException {
- totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+ totalOpTimeMs.addAndGet(elapsedNano / 1000000);
numKeys.addAndGet(1);
if (!result.isEmpty()) {
if (verify) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java
index 99b4f1d..df59547 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java
@@ -89,7 +89,7 @@ public class MultiThreadedReaderWithACL extends MultiThreadedReader {
final String rowKey = Bytes.toString(get.getRow());
// read the data
- final long start = System.currentTimeMillis();
+ final long start = System.nanoTime();
PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
@@ -109,7 +109,8 @@ public class MultiThreadedReaderWithACL extends MultiThreadedReader {
}
boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0);
LOG.info("Read happening from ACL " + isNullExpected);
- getResultMetricUpdation(verify, rowKey, start, result, localTable, isNullExpected);
+ long end = System.nanoTime();
+ verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, localTable, isNullExpected);
} catch (IOException e) {
recordFailure(keyToRead);
}