You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/11/22 06:24:09 UTC
hbase git commit: HBASE-19311 Promote TestAcidGuarantees to
LargeTests and start mini cluster once to make it faster
Repository: hbase
Updated Branches:
refs/heads/master 3b2b22b5f -> 1a4037b9d
HBASE-19311 Promote TestAcidGuarantees to LargeTests and start mini cluster once to make it faster
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1a4037b9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1a4037b9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1a4037b9
Branch: refs/heads/master
Commit: 1a4037b9dc1efbfacd81bb38770f3ee42cf5545a
Parents: 3b2b22b
Author: zhangduo <zh...@apache.org>
Authored: Tue Nov 21 21:18:54 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Nov 22 14:10:10 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/util/AbstractHBaseTool.java | 10 +-
.../hbase/IntegrationTestAcidGuarantees.java | 60 ++-
.../hadoop/hbase/AcidGuaranteesTestTool.java | 415 ++++++++++++++++
.../apache/hadoop/hbase/TestAcidGuarantees.java | 498 +++----------------
4 files changed, 510 insertions(+), 473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a4037b9/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
index b808d3e..e301c1f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@@ -231,6 +231,14 @@ public abstract class AbstractHBaseTool implements Tool, Configurable {
}
}
+ public long getOptionAsLong(CommandLine cmd, String opt, int defaultValue) {
+ if (cmd.hasOption(opt)) {
+ return Long.parseLong(cmd.getOptionValue(opt));
+ } else {
+ return defaultValue;
+ }
+ }
+
public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) {
if (cmd.hasOption(opt)) {
return Double.parseDouble(cmd.getOptionValue(opt));
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a4037b9/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java
index e1c17a4..3c1e6ad 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java
@@ -17,26 +17,30 @@
*/
package org.apache.hadoop.hbase;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_A;
+import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_B;
+import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_C;
+import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.TABLE_NAME;
+
+import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
-import org.apache.hadoop.hbase.regionserver.MemStoreCompactor;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.util.Set;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
/**
* This Integration Test verifies acid guarantees across column families by frequently writing
* values to rows with multiple column families and concurrently reading entire rows that expect all
* column families.
- *
* <p>
* Sample usage:
+ *
* <pre>
* hbase org.apache.hadoop.hbase.IntegrationTestAcidGuarantees -Dmillis=10000 -DnumWriters=50
* -DnumGetters=2 -DnumScanners=2 -DnumUniqueRows=5
@@ -47,19 +51,11 @@ public class IntegrationTestAcidGuarantees extends IntegrationTestBase {
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
// The unit test version.
- TestAcidGuarantees tag;
+ AcidGuaranteesTestTool tool;
@Override
public int runTestFromCommandLine() throws Exception {
- Configuration c = getConf();
- int millis = c.getInt("millis", 5000);
- int numWriters = c.getInt("numWriters", 50);
- int numGetters = c.getInt("numGetters", 2);
- int numScanners = c.getInt("numScanners", 2);
- int numUniqueRows = c.getInt("numUniqueRows", 3);
- boolean useMob = c.getBoolean("useMob",false);
- tag.runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob);
- return 0;
+ return tool.run(new String[0]);
}
@Override
@@ -68,50 +64,50 @@ public class IntegrationTestAcidGuarantees extends IntegrationTestBase {
util = getTestingUtil(getConf());
util.initializeCluster(SERVER_COUNT);
conf = getConf();
- conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
+ conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024));
// prevent aggressive region split
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
- ConstantSizeRegionSplitPolicy.class.getName());
- this.setConf(util.getConfiguration());
+ ConstantSizeRegionSplitPolicy.class.getName());
- // replace the HBaseTestingUtility in the unit test with the integration test's
- // IntegrationTestingUtility
- tag = new TestAcidGuarantees(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
- tag.setHBaseTestingUtil(util);
+ tool = new AcidGuaranteesTestTool();
+ tool.setConf(getConf());
}
@Override
public TableName getTablename() {
- return TestAcidGuarantees.TABLE_NAME;
+ return TABLE_NAME;
}
@Override
protected Set<String> getColumnFamilies() {
- return Sets.newHashSet(Bytes.toString(TestAcidGuarantees.FAMILY_A),
- Bytes.toString(TestAcidGuarantees.FAMILY_B),
- Bytes.toString(TestAcidGuarantees.FAMILY_C));
+ return Sets.newHashSet(Bytes.toString(FAMILY_A), Bytes.toString(FAMILY_B),
+ Bytes.toString(FAMILY_C));
}
- // ***** Actual integration tests
+ private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
+ int numUniqueRows) throws Exception {
+ tool.run(new String[] { "-millis", String.valueOf(millisToRun), "-numWriters",
+ String.valueOf(numWriters), "-numGetters", String.valueOf(numGetters), "-numScanners",
+ String.valueOf(numScanners), "-numUniqueRows", String.valueOf(numUniqueRows) });
+ }
+ // ***** Actual integration tests
@Test
public void testGetAtomicity() throws Exception {
- tag.runTestAtomicity(20000, 4, 4, 0, 3);
+ runTestAtomicity(20000, 4, 4, 0, 3);
}
@Test
public void testScanAtomicity() throws Exception {
- tag.runTestAtomicity(20000, 3, 0, 2, 3);
+ runTestAtomicity(20000, 3, 0, 2, 3);
}
@Test
public void testMixedAtomicity() throws Exception {
- tag.runTestAtomicity(20000, 4, 2, 2, 3);
+ runTestAtomicity(20000, 4, 2, 2, 3);
}
-
// **** Command line hook
-
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a4037b9/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java
new file mode 100644
index 0000000..5e00e8c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+/**
+ * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
+ * that reads never see partially-complete writes
+ */
+@InterfaceAudience.Private
+public class AcidGuaranteesTestTool extends AbstractHBaseTool {
+
+ private static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
+
+ public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
+ public static final byte[] FAMILY_A = Bytes.toBytes("A");
+ public static final byte[] FAMILY_B = Bytes.toBytes("B");
+ public static final byte[] FAMILY_C = Bytes.toBytes("C");
+ public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
+
+ public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C };
+
+ public static int NUM_COLS_TO_CHECK = 50;
+
+ private ExecutorService sharedPool;
+
+ private long millisToRun;
+ private int numWriters;
+ private int numGetters;
+ private int numScanners;
+ private int numUniqueRows;
+ private boolean crazyFlush;
+ private boolean useMob;
+
+ private ExecutorService createThreadPool() {
+ int maxThreads = 256;
+ int coreThreads = 128;
+
+ long keepAliveTime = 60;
+ BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(
+ maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
+
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
+ TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + "-shared"));
+ tpe.allowCoreThreadTimeOut(true);
+ return tpe;
+ }
+
+ @Override
+ protected void addOptions() {
+ addOptWithArg("millis", "time limit in milliseconds");
+ addOptWithArg("numWriters", "number of write threads");
+ addOptWithArg("numGetters", "number of get threads");
+ addOptWithArg("numScanners", "number of scan threads");
+ addOptWithArg("numUniqueRows", "number of unique rows to test");
+ addOptNoArg("crazyFlush",
+ "if specified we will flush continuously otherwise will flush every minute");
+ addOptNoArg("useMob", "if specified we will enable mob on the first column family");
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ millisToRun = getOptionAsLong(cmd, "millis", 5000);
+ numWriters = getOptionAsInt(cmd, "numWriters", 50);
+ numGetters = getOptionAsInt(cmd, "numGetters", 2);
+ numScanners = getOptionAsInt(cmd, "numScanners", 2);
+ numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3);
+ crazyFlush = cmd.hasOption("crazyFlush");
+ useMob = cmd.hasOption("useMob");
+ }
+
+ @Override
+ protected int doWork() throws Exception {
+ sharedPool = createThreadPool();
+ try (Connection conn = ConnectionFactory.createConnection(getConf())) {
+ runTestAtomicity(conn.getAdmin());
+ } finally {
+ sharedPool.shutdown();
+ }
+ return 0;
+ }
+
+ /**
+ * Thread that does random full-row writes into a table.
+ */
+ public static class AtomicityWriter extends RepeatingTestThread {
+ Random rand = new Random();
+ byte data[] = new byte[10];
+ byte[][] targetRows;
+ byte[][] targetFamilies;
+ Connection connection;
+ Table table;
+ AtomicLong numWritten = new AtomicLong();
+
+ public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies,
+ ExecutorService pool) throws IOException {
+ super(ctx);
+ this.targetRows = targetRows;
+ this.targetFamilies = targetFamilies;
+ connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
+ table = connection.getTable(TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ // Pick a random row to write into
+ byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
+ Put p = new Put(targetRow);
+ rand.nextBytes(data);
+
+ for (byte[] family : targetFamilies) {
+ for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+ byte qualifier[] = Bytes.toBytes("col" + i);
+ p.addColumn(family, qualifier, data);
+ }
+ }
+ table.put(p);
+ numWritten.getAndIncrement();
+ }
+
+ @Override
+ public void workDone() throws IOException {
+ try {
+ table.close();
+ } finally {
+ connection.close();
+ }
+ }
+ }
+
+ /**
+ * Thread that does single-row reads in a table, looking for partially completed rows.
+ */
+ public static class AtomicGetReader extends RepeatingTestThread {
+ byte[] targetRow;
+ byte[][] targetFamilies;
+ Connection connection;
+ Table table;
+ int numVerified = 0;
+ AtomicLong numRead = new AtomicLong();
+
+ public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies,
+ ExecutorService pool) throws IOException {
+ super(ctx);
+ this.targetRow = targetRow;
+ this.targetFamilies = targetFamilies;
+ connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
+ table = connection.getTable(TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ Get g = new Get(targetRow);
+ Result res = table.get(g);
+ byte[] gotValue = null;
+ if (res.getRow() == null) {
+ // Trying to verify but we didn't find the row - the writing
+ // thread probably just hasn't started writing yet, so we can
+ // ignore this action
+ return;
+ }
+
+ for (byte[] family : targetFamilies) {
+ for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+ byte qualifier[] = Bytes.toBytes("col" + i);
+ byte thisValue[] = res.getValue(family, qualifier);
+ if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+ gotFailure(gotValue, res);
+ }
+ numVerified++;
+ gotValue = thisValue;
+ }
+ }
+ numRead.getAndIncrement();
+ }
+
+ @Override
+ public void workDone() throws IOException {
+ try {
+ table.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ private void gotFailure(byte[] expected, Result res) {
+ StringBuilder msg = new StringBuilder();
+ msg.append("Failed after ").append(numVerified).append("!");
+ msg.append("Expected=").append(Bytes.toStringBinary(expected));
+ msg.append("Got:\n");
+ for (Cell kv : res.listCells()) {
+ msg.append(kv.toString());
+ msg.append(" val= ");
+ msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
+ msg.append("\n");
+ }
+ throw new RuntimeException(msg.toString());
+ }
+ }
+
+ /**
+ * Thread that does full scans of the table looking for any partially completed rows.
+ */
+ public static class AtomicScanReader extends RepeatingTestThread {
+ byte[][] targetFamilies;
+ Table table;
+ Connection connection;
+ AtomicLong numScans = new AtomicLong();
+ AtomicLong numRowsScanned = new AtomicLong();
+
+ public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool)
+ throws IOException {
+ super(ctx);
+ this.targetFamilies = targetFamilies;
+ connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
+ table = connection.getTable(TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ Scan s = new Scan();
+ for (byte[] family : targetFamilies) {
+ s.addFamily(family);
+ }
+ ResultScanner scanner = table.getScanner(s);
+
+ for (Result res : scanner) {
+ byte[] gotValue = null;
+
+ for (byte[] family : targetFamilies) {
+ for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+ byte qualifier[] = Bytes.toBytes("col" + i);
+ byte thisValue[] = res.getValue(family, qualifier);
+ if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+ gotFailure(gotValue, res);
+ }
+ gotValue = thisValue;
+ }
+ }
+ numRowsScanned.getAndIncrement();
+ }
+ numScans.getAndIncrement();
+ }
+
+ @Override
+ public void workDone() throws IOException {
+ try {
+ table.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ private void gotFailure(byte[] expected, Result res) {
+ StringBuilder msg = new StringBuilder();
+ msg.append("Failed after ").append(numRowsScanned).append("!");
+ msg.append("Expected=").append(Bytes.toStringBinary(expected));
+ msg.append("Got:\n");
+ for (Cell kv : res.listCells()) {
+ msg.append(kv.toString());
+ msg.append(" val= ");
+ msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
+ msg.append("\n");
+ }
+ throw new RuntimeException(msg.toString());
+ }
+ }
+
+ private void createTableIfMissing(Admin admin, boolean useMob) throws IOException {
+ if (!admin.tableExists(TABLE_NAME)) {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
+ Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
+ .forEachOrdered(builder::addColumnFamily);
+ admin.createTable(builder.build());
+ }
+ ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0];
+ if (cfd.isMobEnabled() != useMob) {
+ admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd)
+ .setMobEnabled(useMob).setMobThreshold(4).build());
+ }
+ }
+
+ private void runTestAtomicity(Admin admin) throws Exception {
+ createTableIfMissing(admin, useMob);
+ TestContext ctx = new TestContext(conf);
+
+ byte rows[][] = new byte[numUniqueRows][];
+ for (int i = 0; i < numUniqueRows; i++) {
+ rows[i] = Bytes.toBytes("test_row_" + i);
+ }
+
+ List<AtomicityWriter> writers = Lists.newArrayList();
+ for (int i = 0; i < numWriters; i++) {
+ AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool);
+ writers.add(writer);
+ ctx.addThread(writer);
+ }
+ // Add a flusher
+ ctx.addThread(new RepeatingTestThread(ctx) {
+ public void doAnAction() throws Exception {
+ try {
+ admin.flush(TABLE_NAME);
+ } catch (IOException ioe) {
+ LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
+ }
+ // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
+ // we would flush as often as possible. On a running cluster, this isn't practical:
+ // (1) we will cause a lot of load due to all the flushing and compacting
+ // (2) we cannot change the flushing/compacting related Configuration options to try to
+ // alleviate this
+ // (3) it is an unrealistic workload, since no one would actually flush that often.
+ // Therefore, let's flush every minute to have more flushes than usual, but not overload
+ // the running cluster.
+ if (!crazyFlush) {
+ Thread.sleep(60000);
+ }
+ }
+ });
+
+ List<AtomicGetReader> getters = Lists.newArrayList();
+ for (int i = 0; i < numGetters; i++) {
+ AtomicGetReader getter =
+ new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool);
+ getters.add(getter);
+ ctx.addThread(getter);
+ }
+
+ List<AtomicScanReader> scanners = Lists.newArrayList();
+ for (int i = 0; i < numScanners; i++) {
+ AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool);
+ scanners.add(scanner);
+ ctx.addThread(scanner);
+ }
+
+ ctx.startThreads();
+ ctx.waitFor(millisToRun);
+ ctx.stop();
+
+ LOG.info("Finished test. Writers:");
+ for (AtomicityWriter writer : writers) {
+ LOG.info(" wrote " + writer.numWritten.get());
+ }
+ LOG.info("Readers:");
+ for (AtomicGetReader reader : getters) {
+ LOG.info(" read " + reader.numRead.get());
+ }
+ LOG.info("Scanners:");
+ for (AtomicScanReader scanner : scanners) {
+ LOG.info(" scanned " + scanner.numScans.get());
+ LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
+ }
+ }
+
+ public static void main(String[] args) {
+ Configuration c = HBaseConfiguration.create();
+ int status;
+ try {
+ AcidGuaranteesTestTool test = new AcidGuaranteesTestTool();
+ status = ToolRunner.run(c, test, args);
+ } catch (Exception e) {
+ LOG.error("Exiting due to error", e);
+ status = -1;
+ }
+ System.exit(status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a4037b9/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
index 9e845ad..e42794e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
@@ -18,434 +18,103 @@
*/
package org.apache.hadoop.hbase;
-import java.io.IOException;
+import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILIES;
+import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.TABLE_NAME;
+
import java.util.List;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/**
- * Test case that uses multiple threads to read and write multifamily rows
- * into a table, verifying that reads never see partially-complete writes.
- *
- * This can run as a junit test, or with a main() function which runs against
- * a real cluster (eg for testing with failures, region movement, etc)
+ * Test case that uses multiple threads to read and write multifamily rows into a table, verifying
+ * that reads never see partially-complete writes. This can run as a junit test, or with a main()
+ * function which runs against a real cluster (eg for testing with failures, region movement, etc)
*/
-@Category({FlakeyTests.class, MediumTests.class})
+@Category({ FlakeyTests.class, LargeTests.class })
@RunWith(Parameterized.class)
-public class TestAcidGuarantees implements Tool {
- @Parameterized.Parameters
+public class TestAcidGuarantees {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @Parameterized.Parameters(name = "{index}: compType={0}")
public static Object[] data() {
return new Object[] { "NONE", "BASIC", "EAGER" };
}
- protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
- public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
- public static final byte [] FAMILY_A = Bytes.toBytes("A");
- public static final byte [] FAMILY_B = Bytes.toBytes("B");
- public static final byte [] FAMILY_C = Bytes.toBytes("C");
- public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
-
- public static final byte[][] FAMILIES = new byte[][] {
- FAMILY_A, FAMILY_B, FAMILY_C };
- private HBaseTestingUtility util;
+ @Parameter
+ public String compType;
- public static int NUM_COLS_TO_CHECK = 50;
-
- // when run as main
- private Configuration conf;
- private ExecutorService sharedPool = null;
-
- private void createTableIfMissing(boolean useMob)
- throws IOException {
- try {
- util.createTable(TABLE_NAME, FAMILIES);
- } catch (TableExistsException tee) {
- }
+ private AcidGuaranteesTestTool tool = new AcidGuaranteesTestTool();
- if (useMob) {
- HTableDescriptor htd = new HTableDescriptor(util.getAdmin().getTableDescriptor(TABLE_NAME));
- HColumnDescriptor hcd = htd.getColumnFamilies()[0];
- // force mob enabled such that all data is mob data
- hcd.setMobEnabled(true);
- hcd.setMobThreshold(4);
- util.getAdmin().modifyColumnFamily(TABLE_NAME, hcd);
- }
- }
-
- public TestAcidGuarantees(String compType) {
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
// Set small flush size for minicluster so we exercise reseeking scanners
- Configuration conf = HBaseConfiguration.create();
+ Configuration conf = UTIL.getConfiguration();
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024));
// prevent aggressive region split
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
- ConstantSizeRegionSplitPolicy.class.getName());
+ ConstantSizeRegionSplitPolicy.class.getName());
conf.setInt("hfile.format.version", 3); // for mob tests
- conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, compType);
- if(MemoryCompactionPolicy.valueOf(compType) == MemoryCompactionPolicy.EAGER) {
- conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
- conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.9);
- }
- util = new HBaseTestingUtility(conf);
- sharedPool = createThreadPool();
+ UTIL.startMiniCluster(1);
}
- public void setHBaseTestingUtil(HBaseTestingUtility util) {
- this.util = util;
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
}
- private ExecutorService createThreadPool() {
-
- int maxThreads = 256;
- int coreThreads = 128;
-
- long keepAliveTime = 60;
- BlockingQueue<Runnable> workQueue =
- new LinkedBlockingQueue<Runnable>(maxThreads *
- HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
-
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(
- coreThreads,
- maxThreads,
- keepAliveTime,
- TimeUnit.SECONDS,
- workQueue,
- Threads.newDaemonThreadFactory(toString() + "-shared"));
- tpe.allowCoreThreadTimeOut(true);
- return tpe;
- }
-
- public ExecutorService getSharedThreadPool() {
- return sharedPool;
- }
-
- /**
- * Thread that does random full-row writes into a table.
- */
- public static class AtomicityWriter extends RepeatingTestThread {
- Random rand = new Random();
- byte data[] = new byte[10];
- byte targetRows[][];
- byte targetFamilies[][];
- Connection connection;
- Table table;
- AtomicLong numWritten = new AtomicLong();
-
- public AtomicityWriter(TestContext ctx, byte targetRows[][],
- byte targetFamilies[][], ExecutorService pool) throws IOException {
- super(ctx);
- this.targetRows = targetRows;
- this.targetFamilies = targetFamilies;
- connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
- table = connection.getTable(TABLE_NAME);
- }
- public void doAnAction() throws Exception {
- // Pick a random row to write into
- byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
- Put p = new Put(targetRow);
- rand.nextBytes(data);
-
- for (byte[] family : targetFamilies) {
- for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
- byte qualifier[] = Bytes.toBytes("col" + i);
- p.addColumn(family, qualifier, data);
- }
- }
- table.put(p);
- numWritten.getAndIncrement();
- }
-
- @Override
- public void workDone() throws IOException {
- try {
- table.close();
- } finally {
- connection.close();
- }
- }
- }
-
- /**
- * Thread that does single-row reads in a table, looking for partially
- * completed rows.
- */
- public static class AtomicGetReader extends RepeatingTestThread {
- byte targetRow[];
- byte targetFamilies[][];
- Connection connection;
- Table table;
- int numVerified = 0;
- AtomicLong numRead = new AtomicLong();
-
- public AtomicGetReader(TestContext ctx, byte targetRow[],
- byte targetFamilies[][], ExecutorService pool) throws IOException {
- super(ctx);
- this.targetRow = targetRow;
- this.targetFamilies = targetFamilies;
- connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
- table = connection.getTable(TABLE_NAME);
- }
-
- public void doAnAction() throws Exception {
- Get g = new Get(targetRow);
- Result res = table.get(g);
- byte[] gotValue = null;
- if (res.getRow() == null) {
- // Trying to verify but we didn't find the row - the writing
- // thread probably just hasn't started writing yet, so we can
- // ignore this action
- return;
- }
-
- for (byte[] family : targetFamilies) {
- for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
- byte qualifier[] = Bytes.toBytes("col" + i);
- byte thisValue[] = res.getValue(family, qualifier);
- if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
- gotFailure(gotValue, res);
- }
- numVerified++;
- gotValue = thisValue;
- }
- }
- numRead.getAndIncrement();
- }
-
- @Override
- public void workDone() throws IOException {
- try {
- table.close();
- } finally {
- connection.close();
- }
- }
-
- private void gotFailure(byte[] expected, Result res) {
- StringBuilder msg = new StringBuilder();
- msg.append("Failed after ").append(numVerified).append("!");
- msg.append("Expected=").append(Bytes.toStringBinary(expected));
- msg.append("Got:\n");
- for (Cell kv : res.listCells()) {
- msg.append(kv.toString());
- msg.append(" val= ");
- msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
- msg.append("\n");
- }
- throw new RuntimeException(msg.toString());
+ @Before
+ public void setUp() throws Exception {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, compType);
+ if (MemoryCompactionPolicy.valueOf(compType) == MemoryCompactionPolicy.EAGER) {
+ builder.setValue(MemStoreLAB.USEMSLAB_KEY, "false");
+ builder.setValue(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, "0.9");
}
+ Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
+ .forEachOrdered(builder::addColumnFamily);
+ UTIL.getAdmin().createTable(builder.build());
+ tool.setConf(UTIL.getConfiguration());
}
- /**
- * Thread that does full scans of the table looking for any partially completed
- * rows.
- */
- public static class AtomicScanReader extends RepeatingTestThread {
- byte targetFamilies[][];
- Table table;
- Connection connection;
- AtomicLong numScans = new AtomicLong();
- AtomicLong numRowsScanned = new AtomicLong();
-
- public AtomicScanReader(TestContext ctx,
- byte targetFamilies[][], ExecutorService pool) throws IOException {
- super(ctx);
- this.targetFamilies = targetFamilies;
- connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
- table = connection.getTable(TABLE_NAME);
- }
-
- public void doAnAction() throws Exception {
- Scan s = new Scan();
- for (byte[] family : targetFamilies) {
- s.addFamily(family);
- }
- ResultScanner scanner = table.getScanner(s);
-
- for (Result res : scanner) {
- byte[] gotValue = null;
-
- for (byte[] family : targetFamilies) {
- for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
- byte qualifier[] = Bytes.toBytes("col" + i);
- byte thisValue[] = res.getValue(family, qualifier);
- if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
- gotFailure(gotValue, res);
- }
- gotValue = thisValue;
- }
- }
- numRowsScanned.getAndIncrement();
- }
- numScans.getAndIncrement();
- }
-
- @Override
- public void workDone() throws IOException {
- try {
- table.close();
- } finally {
- connection.close();
- }
- }
-
- private void gotFailure(byte[] expected, Result res) {
- StringBuilder msg = new StringBuilder();
- msg.append("Failed after ").append(numRowsScanned).append("!");
- msg.append("Expected=").append(Bytes.toStringBinary(expected));
- msg.append("Got:\n");
- for (Cell kv : res.listCells()) {
- msg.append(kv.toString());
- msg.append(" val= ");
- msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
- msg.append("\n");
- }
- throw new RuntimeException(msg.toString());
- }
+ @After
+ public void tearDown() throws Exception {
+ UTIL.deleteTable(TABLE_NAME);
}
- public void runTestAtomicity(long millisToRun,
- int numWriters,
- int numGetters,
- int numScanners,
+ private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
int numUniqueRows) throws Exception {
runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
}
- public void runTestAtomicity(long millisToRun,
- int numWriters,
- int numGetters,
- int numScanners,
- int numUniqueRows,
- final boolean systemTest) throws Exception {
- runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, systemTest,
- false);
- }
-
- public void runTestAtomicity(long millisToRun,
- int numWriters,
- int numGetters,
- int numScanners,
- int numUniqueRows,
- final boolean systemTest,
- final boolean useMob) throws Exception {
-
- createTableIfMissing(useMob);
- // set the max threads to avoid java.lang.OutOfMemoryError: unable to create new native thread
- util.getConfiguration().setInt("hbase.hconnection.threads.max", 40);
- TestContext ctx = new TestContext(util.getConfiguration());
-
- byte rows[][] = new byte[numUniqueRows][];
- for (int i = 0; i < numUniqueRows; i++) {
- rows[i] = Bytes.toBytes("test_row_" + i);
- }
-
- List<AtomicityWriter> writers = Lists.newArrayList();
- for (int i = 0; i < numWriters; i++) {
- AtomicityWriter writer = new AtomicityWriter(
- ctx, rows, FAMILIES, getSharedThreadPool());
- writers.add(writer);
- ctx.addThread(writer);
- }
- // Add a flusher
- ctx.addThread(new RepeatingTestThread(ctx) {
- Admin admin = util.getAdmin();
- public void doAnAction() throws Exception {
- try {
- admin.flush(TABLE_NAME);
- } catch(IOException ioe) {
- LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
- }
- // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
- // we would flush as often as possible. On a running cluster, this isn't practical:
- // (1) we will cause a lot of load due to all the flushing and compacting
- // (2) we cannot change the flushing/compacting related Configuration options to try to
- // alleviate this
- // (3) it is an unrealistic workload, since no one would actually flush that often.
- // Therefore, let's flush every minute to have more flushes than usual, but not overload
- // the running cluster.
- if (systemTest) Thread.sleep(60000);
- }
- });
-
- List<AtomicGetReader> getters = Lists.newArrayList();
- for (int i = 0; i < numGetters; i++) {
- AtomicGetReader getter = new AtomicGetReader(
- ctx, rows[i % numUniqueRows], FAMILIES, getSharedThreadPool());
- getters.add(getter);
- ctx.addThread(getter);
- }
-
- List<AtomicScanReader> scanners = Lists.newArrayList();
- for (int i = 0; i < numScanners; i++) {
- AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, getSharedThreadPool());
- scanners.add(scanner);
- ctx.addThread(scanner);
- }
-
- ctx.startThreads();
- ctx.waitFor(millisToRun);
- ctx.stop();
-
- LOG.info("Finished test. Writers:");
- for (AtomicityWriter writer : writers) {
- LOG.info(" wrote " + writer.numWritten.get());
- }
- LOG.info("Readers:");
- for (AtomicGetReader reader : getters) {
- LOG.info(" read " + reader.numRead.get());
- }
- LOG.info("Scanners:");
- for (AtomicScanReader scanner : scanners) {
- LOG.info(" scanned " + scanner.numScans.get());
- LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
+ private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
+ int numUniqueRows, boolean useMob) throws Exception {
+ List<String> args = Lists.newArrayList("-millis", String.valueOf(millisToRun), "-numWriters",
+ String.valueOf(numWriters), "-numGetters", String.valueOf(numGetters), "-numScanners",
+ String.valueOf(numScanners), "-numUniqueRows", String.valueOf(numUniqueRows), "-crazyFlush");
+ if (useMob) {
+ args.add("-useMob");
}
- }
-
- @Before
- public void setUp() throws Exception {
- util.startMiniCluster(1);
- }
-
- @After
- public void tearDown() throws Exception {
- util.shutdownMiniCluster();
+ tool.run(args.toArray(new String[0]));
}
@Test
@@ -465,67 +134,16 @@ public class TestAcidGuarantees implements Tool {
@Test
public void testMobGetAtomicity() throws Exception {
- boolean systemTest = false;
- boolean useMob = true;
- runTestAtomicity(20000, 5, 5, 0, 3, systemTest, useMob);
+ runTestAtomicity(20000, 5, 5, 0, 3, true);
}
@Test
public void testMobScanAtomicity() throws Exception {
- boolean systemTest = false;
- boolean useMob = true;
- runTestAtomicity(20000, 5, 0, 5, 3, systemTest, useMob);
+ runTestAtomicity(20000, 5, 0, 5, 3, true);
}
@Test
public void testMobMixedAtomicity() throws Exception {
- boolean systemTest = false;
- boolean useMob = true;
- runTestAtomicity(20000, 5, 2, 2, 3, systemTest, useMob);
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Tool interface
- ////////////////////////////////////////////////////////////////////////////
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration c) {
- this.conf = c;
- this.util = new HBaseTestingUtility(c);
+ runTestAtomicity(20000, 5, 2, 2, 3, true);
}
-
- @Override
- public int run(String[] arg0) throws Exception {
- Configuration c = getConf();
- int millis = c.getInt("millis", 5000);
- int numWriters = c.getInt("numWriters", 50);
- int numGetters = c.getInt("numGetters", 2);
- int numScanners = c.getInt("numScanners", 2);
- int numUniqueRows = c.getInt("numUniqueRows", 3);
- boolean useMob = c.getBoolean("useMob",false);
- assert useMob && c.getInt("hfile.format.version", 2) == 3 : "Mob runs must use hfile v3";
- runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob);
- return 0;
- }
-
- public static void main(String args[]) throws Exception {
- Configuration c = HBaseConfiguration.create();
- int status;
- try {
- TestAcidGuarantees test = new TestAcidGuarantees(CompactingMemStore
- .COMPACTING_MEMSTORE_TYPE_DEFAULT);
- status = ToolRunner.run(c, test, args);
- } catch (Exception e) {
- LOG.error("Exiting due to error", e);
- status = -1;
- }
- System.exit(status);
- }
-
-
}
-