You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2013/08/03 20:43:19 UTC
svn commit: r1510057 - in /hbase/branches/0.95/hbase-server/src:
main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
Author: jeffreyz
Date: Sat Aug 3 18:43:19 2013
New Revision: 1510057
URL: http://svn.apache.org/r1510057
Log:
hbase-8816: Add support of loading multiple tables into LoadTestTool
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java?rev=1510057&r1=1510056&r2=1510057&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java Sat Aug 3 18:43:19 2013
@@ -41,8 +41,8 @@ import org.apache.hadoop.util.ToolRunner
@InterfaceAudience.Private
public abstract class AbstractHBaseTool implements Tool {
- private static final int EXIT_SUCCESS = 0;
- private static final int EXIT_FAILURE = 1;
+ protected static final int EXIT_SUCCESS = 0;
+ protected static final int EXIT_FAILURE = 1;
private static final String SHORT_HELP_OPTION = "h";
private static final String LONG_HELP_OPTION = "help";
@@ -54,6 +54,8 @@ public abstract class AbstractHBaseTool
protected Configuration conf = null;
private static final Set<String> requiredOptions = new TreeSet<String>();
+
+ protected String[] cmdLineArgs = null;
/**
* Override this to add command-line options using {@link #addOptWithArg}
@@ -90,6 +92,7 @@ public abstract class AbstractHBaseTool
try {
// parse the command line arguments
cmd = parseArgs(args);
+ cmdLineArgs = args;
} catch (ParseException e) {
LOG.error("Error when parsing command-line arguemnts", e);
printUsage();
@@ -125,14 +128,14 @@ public abstract class AbstractHBaseTool
return success;
}
- private CommandLine parseArgs(String[] args) throws ParseException {
+ protected CommandLine parseArgs(String[] args) throws ParseException {
options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage");
addOptions();
CommandLineParser parser = new BasicParser();
return parser.parse(options, args);
}
- private void printUsage() {
+ protected void printUsage() {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setWidth(80);
String usageHeader = "Options:";
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1510057&r1=1510056&r2=1510057&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Sat Aug 3 18:43:19 2013
@@ -17,11 +17,17 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -32,6 +38,7 @@ import org.apache.hadoop.hbase.io.compre
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.ToolRunner;
/**
* A command-line utility that reads, writes, and verifies data. Unlike
@@ -102,6 +109,7 @@ public class LoadTestTool extends Abstra
protected static final String OPT_ZK_QUORUM = "zk";
protected static final String OPT_SKIP_INIT = "skip_init";
protected static final String OPT_INIT_ONLY = "init_only";
+ private static final String NUM_TABLES = "num_tables";
protected static final long DEFAULT_START_KEY = 0;
@@ -128,10 +136,12 @@ public class LoadTestTool extends Abstra
protected boolean isMultiPut;
// Reader options
- protected int numReaderThreads = DEFAULT_NUM_THREADS;
- protected int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
- protected int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
- protected int verifyPercent;
+ private int numReaderThreads = DEFAULT_NUM_THREADS;
+ private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
+ private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
+ private int verifyPercent;
+
+ private int numTables = 1;
// TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
// console tool itself should only be used from console.
@@ -223,6 +233,11 @@ public class LoadTestTool extends Abstra
DEFAULT_START_KEY + ".");
addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
+ "already exists");
+
+ addOptWithArg(NUM_TABLES,
+ "A positive integer number. When a number n is speicfied, load test "
+ + "tool will load n table parallely. -tn parameter value becomes "
+ + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
}
@Override
@@ -308,6 +323,11 @@ public class LoadTestTool extends Abstra
System.out.println("Percent of keys to verify: " + verifyPercent);
System.out.println("Reader threads: " + numReaderThreads);
}
+
+ numTables = 1;
+ if(cmd.hasOption(NUM_TABLES)) {
+ numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
+ }
}
private void parseColumnFamilyOptions(CommandLine cmd) {
@@ -339,6 +359,14 @@ public class LoadTestTool extends Abstra
@Override
protected int doWork() throws IOException {
+ if (numTables > 1) {
+ return parallelLoadTables();
+ } else {
+ return loadTable();
+ }
+ }
+
+ protected int loadTable() throws IOException {
if (cmd.hasOption(OPT_ZK_QUORUM)) {
conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
}
@@ -399,11 +427,112 @@ public class LoadTestTool extends Abstra
success = success && readerThreads.getNumReadErrors() == 0
&& readerThreads.getNumReadFailures() == 0;
}
- return success ? 0 : 1;
+ return success ? EXIT_SUCCESS : this.EXIT_FAILURE;
}
-
+
public static void main(String[] args) {
new LoadTestTool().doStaticMain(args);
}
+ /**
+ * When NUM_TABLES is specified, the function starts multiple worker threads
+ * which individually start a LoadTestTool instance to load a table. Each
+ * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2"
+ * , table names will be "test_1", "test_2"
+ *
+ * @throws IOException
+ */
+ private int parallelLoadTables()
+ throws IOException {
+ // create new command args
+ String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
+ String[] newArgs = null;
+ if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
+ newArgs = new String[cmdLineArgs.length + 2];
+ newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
+ for (int i = 0; i < cmdLineArgs.length; i++) {
+ newArgs[i + 2] = cmdLineArgs[i];
+ }
+ } else {
+ newArgs = cmdLineArgs;
+ }
+
+ int tableNameValueIndex = -1;
+ for (int j = 0; j < newArgs.length; j++) {
+ if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
+ tableNameValueIndex = j + 1;
+ } else if (newArgs[j].endsWith(NUM_TABLES)) {
+ // change NUM_TABLES to 1 so that each worker loads one table
+ newArgs[j + 1] = "1";
+ }
+ }
+
+ // starting to load multiple tables
+ List<WorkerThread> workers = new ArrayList<WorkerThread>();
+ for (int i = 0; i < numTables; i++) {
+ String[] workerArgs = newArgs.clone();
+ workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
+ WorkerThread worker = new WorkerThread(i, workerArgs);
+ workers.add(worker);
+ LOG.info(worker + " starting");
+ worker.start();
+ }
+
+ // wait for all workers finish
+ LOG.info("Waiting for worker threads to finish");
+ for (WorkerThread t : workers) {
+ try {
+ t.join();
+ } catch (InterruptedException ie) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ checkForErrors();
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ // If an exception is thrown by one of worker threads, it will be
+ // stored here.
+ protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+ private void workerThreadError(Throwable t) {
+ thrown.compareAndSet(null, t);
+ }
+
+ /**
+ * Check for errors in the writer threads. If any is found, rethrow it.
+ */
+ private void checkForErrors() throws IOException {
+ Throwable thrown = this.thrown.get();
+ if (thrown == null) return;
+ if (thrown instanceof IOException) {
+ throw (IOException) thrown;
+ } else {
+ throw new RuntimeException(thrown);
+ }
+ }
+
+ class WorkerThread extends Thread {
+ private String[] workerArgs;
+
+ WorkerThread(int i, String[] args) {
+ super("WorkerThread-" + i);
+ workerArgs = args;
+ }
+
+ public void run() {
+ try {
+ int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
+ if (ret != 0) {
+ throw new RuntimeException("LoadTestTool exit with non-zero return code.");
+ }
+ } catch (Exception ex) {
+ LOG.error("Error in worker thread", ex);
+ workerThreadError(ex);
+ }
+ }
+ }
}