You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/15 00:57:49 UTC
svn commit: r1433224 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/
hbase-common/src/test/java/org/apache/hadoop/hbase/util/
hbase-it/src/test/java/org/apache/hadoop/hbase/
hbase-server/src/main/java/org/apache/hadoop/...
Author: tedyu
Date: Mon Jan 14 23:57:48 2013
New Revision: 1433224
URL: http://svn.apache.org/viewvc?rev=1433224&view=rev
Log:
HBASE-7383 create integration test for HBASE-5416 (improving scan performance for certain filters) (Sergey)
Added:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java
hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java
Modified:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java?rev=1433224&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java Mon Jan 14 23:57:48 2013
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util.test;
+
+import java.util.Set;
+
+/**
+ * A generator of random data (keys/cfs/columns/values) for load testing.
+ * Contains LoadTestKVGenerator as a matter of convenience...
+ */
+public abstract class LoadTestDataGenerator {
+ protected final LoadTestKVGenerator kvGenerator;
+
+ /**
+ * Initializes the object.
+ * @param minValueSize minimum size of the value generated by
+ * {@link #generateValue(byte[], byte[], byte[])}.
+ * @param maxValueSize maximum size of the value generated by
+ * {@link #generateValue(byte[], byte[], byte[])}.
+ */
+ public LoadTestDataGenerator(int minValueSize, int maxValueSize) {
+ this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize);
+ }
+
+ /**
+ * Generates a deterministic, unique hashed row key from a number. That way, the user can
+ * keep track of numbers, without messing with byte array and ensuring key distribution.
+ * @param keyBase Base number for a key, such as a loop counter.
+ */
+ public abstract byte[] getDeterministicUniqueKey(long keyBase);
+
+ /**
+ * Gets column families for the load test table.
+ * @return The array of byte[]s representing column family names.
+ */
+ public abstract byte[][] getColumnFamilies();
+
+ /**
+ * Generates an applicable set of columns to be used for a particular key and family.
+ * @param rowKey The row key to generate for.
+ * @param cf The column family name to generate for.
+ * @return The array of byte[]s representing column names.
+ */
+ public abstract byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf);
+
+ /**
+ * Generates a value to be used for a particular row/cf/column.
+ * @param rowKey The row key to generate for.
+ * @param cf The column family name to generate for.
+ * @param column The column name to generate for.
+ * @return The value to use.
+ */
+ public abstract byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column);
+
+ /**
+ * Checks that columns for a rowKey and cf are valid if generated via
+ * {@link #generateColumnsForCf(byte[], byte[])}
+ * @param rowKey The row key to verify for.
+ * @param cf The column family name to verify for.
+ * @param columnSet The column set (for example, encountered by read).
+ * @return True iff valid.
+ */
+ public abstract boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet);
+
+ /**
+ * Checks that value for a rowKey/cf/column is valid if generated via
+ * {@link #generateValue(byte[], byte[], byte[])}
+ * @param rowKey The row key to verify for.
+ * @param cf The column family name to verify for.
+ * @param column The column name to verify for.
+ * @param value The value (for example, encountered by read).
+ * @return True iff valid.
+ */
+ public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value);
+}
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java Mon Jan 14 23:57:48 2013
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.hbase.util.test;
+import java.util.Map;
import java.util.Random;
import org.apache.hadoop.hbase.util.Bytes;
@@ -27,8 +28,6 @@ import org.apache.hadoop.hbase.util.MD5H
* hash. Values are generated by selecting value size in the configured range
* and generating a pseudo-random sequence of bytes seeded by key, column
* qualifier, and value size.
- * <p>
- * Not thread-safe, so a separate instance is needed for every writer thread/
*/
public class LoadTestKVGenerator {
@@ -49,13 +48,13 @@ public class LoadTestKVGenerator {
/**
* Verifies that the given byte array is the same as what would be generated
- * for the given row key and qualifier. We are assuming that the value size
- * is correct, and only verify the actual bytes. However, if the min/max
- * value sizes are set sufficiently high, an accidental match should be
+ * for the given seed strings (row/cf/column/...). We are assuming that the
+ * value size is correct, and only verify the actual bytes. However, if the
+ * min/max value sizes are set sufficiently high, an accidental match should be
* extremely improbable.
*/
- public static boolean verify(String rowKey, String qual, byte[] value) {
- byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length);
+ public static boolean verify(byte[] value, byte[]... seedStrings) {
+ byte[] expectedData = getValueForRowColumn(value.length, seedStrings);
return Bytes.equals(expectedData, value);
}
@@ -74,27 +73,31 @@ public class LoadTestKVGenerator {
/**
* Generates a value for the given key index and column qualifier. Size is
* selected randomly in the configured range. The generated value depends
- * only on the combination of the key, qualifier, and the selected value
- * size. This allows to verify the actual value bytes when reading, as done
- * in {@link #verify(String, String, byte[])}.
+ * only on the combination of the strings passed (key/cf/column/...) and the selected
+ * value size. This allows to verify the actual value bytes when reading, as done
+ * in {#verify(byte[], byte[]...)}
+ * This method is as thread-safe as Random class. It appears that the worst bug ever
+ * found with the latter is that multiple threads will get some duplicate values, which
+ * we don't care about.
*/
- public byte[] generateRandomSizeValue(long key, String qual) {
- String rowKey = md5PrefixedKey(key);
+ public byte[] generateRandomSizeValue(byte[]... seedStrings) {
int dataSize = minValueSize;
- if(minValueSize != maxValueSize){
+ if(minValueSize != maxValueSize) {
dataSize = minValueSize + randomForValueSize.nextInt(Math.abs(maxValueSize - minValueSize));
}
- return getValueForRowColumn(rowKey, qual, dataSize);
+ return getValueForRowColumn(dataSize, seedStrings);
}
/**
* Generates random bytes of the given size for the given row and column
* qualifier. The random seed is fully determined by these parameters.
*/
- private static byte[] getValueForRowColumn(String rowKey, String qual,
- int dataSize) {
- Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() +
- dataSize);
+ private static byte[] getValueForRowColumn(int dataSize, byte[]... seedStrings) {
+ long seed = dataSize;
+ for (byte[] str : seedStrings) {
+ seed += Bytes.toString(str).hashCode();
+ }
+ Random seededRandom = new Random(seed);
byte[] randomBytes = new byte[dataSize];
seededRandom.nextBytes(randomBytes);
return randomBytes;
Modified: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java (original)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java Mon Jan 14 23:57:48 2013
@@ -41,8 +41,8 @@ public class TestLoadTestKVGenerator {
@Test
public void testValueLength() {
for (int i = 0; i < 1000; ++i) {
- byte[] v = gen.generateRandomSizeValue(i,
- String.valueOf(rand.nextInt()));
+ byte[] v = gen.generateRandomSizeValue(Integer.toString(i).getBytes(),
+ String.valueOf(rand.nextInt()).getBytes());
assertTrue(MIN_LEN <= v.length);
assertTrue(v.length <= MAX_LEN);
}
@@ -52,12 +52,12 @@ public class TestLoadTestKVGenerator {
public void testVerification() {
for (int i = 0; i < 1000; ++i) {
for (int qualIndex = 0; qualIndex < 20; ++qualIndex) {
- String qual = String.valueOf(qualIndex);
- byte[] v = gen.generateRandomSizeValue(i, qual);
- String rowKey = LoadTestKVGenerator.md5PrefixedKey(i);
- assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v));
+ byte[] qual = String.valueOf(qualIndex).getBytes();
+ byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
+ byte[] v = gen.generateRandomSizeValue(rowKey, qual);
+ assertTrue(LoadTestKVGenerator.verify(v, rowKey, qual));
v[0]++;
- assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v));
+ assertFalse(LoadTestKVGenerator.verify(v, rowKey, qual));
}
}
}
Added: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java?rev=1433224&view=auto
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java (added)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java Mon Jan 14 23:57:48 2013
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.hbase.util.MultiThreadedWriter;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Integration test that verifies lazy CF loading during scans by doing repeated scans
+ * with this feature while multiple threads are continuously writing values; and
+ * verifying the result.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestLazyCfLoading {
+ private static final String TABLE_NAME = IntegrationTestLazyCfLoading.class.getSimpleName();
+ private static final String TIMEOUT_KEY = "hbase.%s.timeout";
+
+ /** A soft test timeout; duration of the test, as such, depends on number of keys to put. */
+ private static final int DEFAULT_TIMEOUT_MINUTES = 10;
+
+ private static final int NUM_SERVERS = 1;
+ /** Set regions per server low to ensure splits happen during test */
+ private static final int REGIONS_PER_SERVER = 3;
+ private static final int KEYS_TO_WRITE_PER_SERVER = 20000;
+ private static final int WRITER_THREADS = 10;
+ private static final int WAIT_BETWEEN_SCANS_MS = 1000;
+
+ private static final Log LOG = LogFactory.getLog(IntegrationTestLazyCfLoading.class);
+ private IntegrationTestingUtility util = new IntegrationTestingUtility();
+ private final DataGenerator dataGen = new DataGenerator();
+
+ /** Custom LoadTestDataGenerator. Uses key generation and verification from
+ * LoadTestKVGenerator. Creates 3 column families; one with an integer column to
+ * filter on, the 2nd one with an integer column that matches the first integer column (for
+ * test-specific verification), and byte[] value that is used for general verification; and
+ * the third one with just the value.
+ */
+ private static class DataGenerator extends LoadTestDataGenerator {
+ private static final int MIN_DATA_SIZE = 4096;
+ private static final int MAX_DATA_SIZE = 65536;
+ public static final byte[] ESSENTIAL_CF = Bytes.toBytes("essential");
+ public static final byte[] JOINED_CF1 = Bytes.toBytes("joined");
+ public static final byte[] JOINED_CF2 = Bytes.toBytes("joined2");
+ public static final byte[] FILTER_COLUMN = Bytes.toBytes("filter");
+ public static final byte[] VALUE_COLUMN = Bytes.toBytes("val");
+ public static final long ACCEPTED_VALUE = 1L;
+
+ private static final Map<byte[], byte[][]> columnMap = new TreeMap<byte[], byte[][]>(
+ Bytes.BYTES_COMPARATOR);
+
+ private final AtomicLong expectedNumberOfKeys = new AtomicLong(0);
+ private final AtomicLong totalNumberOfKeys = new AtomicLong(0);
+
+ public DataGenerator() {
+ super(MIN_DATA_SIZE, MAX_DATA_SIZE);
+ columnMap.put(ESSENTIAL_CF, new byte[][] { FILTER_COLUMN });
+ columnMap.put(JOINED_CF1, new byte[][] { FILTER_COLUMN, VALUE_COLUMN });
+ columnMap.put(JOINED_CF2, new byte[][] { VALUE_COLUMN });
+ }
+
+ public long getExpectedNumberOfKeys() {
+ return expectedNumberOfKeys.get();
+ }
+
+ public long getTotalNumberOfKeys() {
+ return totalNumberOfKeys.get();
+ }
+
+ @Override
+ public byte[] getDeterministicUniqueKey(long keyBase) {
+ return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
+ }
+
+ @Override
+ public byte[][] getColumnFamilies() {
+ return columnMap.keySet().toArray(new byte[columnMap.size()][]);
+ }
+
+ @Override
+ public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
+ return columnMap.get(cf);
+ }
+
+ @Override
+ public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
+ if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) {
+ // Random deterministic way to make some values "on" and others "off" for filters.
+ long value = Long.parseLong(Bytes.toString(rowKey, 0, 4), 16) & ACCEPTED_VALUE;
+ if (Bytes.BYTES_COMPARATOR.compare(cf, ESSENTIAL_CF) == 0) {
+ totalNumberOfKeys.incrementAndGet();
+ if (value == ACCEPTED_VALUE) {
+ expectedNumberOfKeys.incrementAndGet();
+ }
+ }
+ return Bytes.toBytes(value);
+ } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) {
+ return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
+ }
+ String error = "Unknown column " + Bytes.toString(column);
+ assert false : error;
+ throw new InvalidParameterException(error);
+ }
+
+ @Override
+ public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
+ if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) {
+ // Relies on the filter from getScanFilter being used.
+ return Bytes.toLong(value) == ACCEPTED_VALUE;
+ } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) {
+ return LoadTestKVGenerator.verify(value, rowKey, cf, column);
+ }
+ return false; // some bogus value from read, we don't expect any such thing.
+ }
+
+ @Override
+ public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
+ return columnMap.get(cf).length == columnSet.size();
+ }
+
+ public Filter getScanFilter() {
+ SingleColumnValueFilter scf = new SingleColumnValueFilter(ESSENTIAL_CF, FILTER_COLUMN,
+ CompareFilter.CompareOp.EQUAL, Bytes.toBytes(ACCEPTED_VALUE));
+ scf.setFilterIfMissing(true);
+ return scf;
+ }
+ };
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("Initializing cluster with " + NUM_SERVERS + " servers");
+ util.initializeCluster(NUM_SERVERS);
+ LOG.info("Done initializing cluster");
+ createTable();
+ }
+
+ private void createTable() throws Exception {
+ deleteTable();
+ LOG.info("Creating table");
+ HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes(TABLE_NAME));
+ for (byte[] cf : dataGen.getColumnFamilies()) {
+ htd.addFamily(new HColumnDescriptor(cf));
+ }
+ int serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
+ byte[][] splits = new RegionSplitter.HexStringSplit().split(serverCount * REGIONS_PER_SERVER);
+ util.getHBaseAdmin().createTable(htd, splits);
+ LOG.info("Created table");
+ }
+
+ private void deleteTable() throws Exception {
+ if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
+ LOG.info("Deleting table");
+ if (!util.getHBaseAdmin().isTableDisabled(TABLE_NAME)) {
+ util.getHBaseAdmin().disableTable(TABLE_NAME);
+ }
+ util.getHBaseAdmin().deleteTable(TABLE_NAME);
+ LOG.info("Deleted table");
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ deleteTable();
+ LOG.info("Restoring the cluster");
+ util.restoreCluster();
+ LOG.info("Done restoring the cluster");
+ }
+
+ @Test
+ public void testReadersAndWriters() throws Exception {
+ Configuration conf = util.getConfiguration();
+ String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
+ long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
+ long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
+ long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
+ HTable table = new HTable(conf, Bytes.toBytes(TABLE_NAME));
+
+ // Create multi-threaded writer and start it. We write multiple columns/CFs and verify
+ // their integrity, therefore multi-put is necessary.
+ MultiThreadedWriter writer =
+ new MultiThreadedWriter(dataGen, conf, Bytes.toBytes(TABLE_NAME));
+ writer.setMultiPut(true);
+
+ LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
+ writer.start(1, keysToWrite, WRITER_THREADS);
+
+ // Now, do scans.
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long timeLimit = now + (maxRuntime * 60000);
+ boolean isWriterDone = false;
+ while (now < timeLimit && !isWriterDone) {
+ LOG.info("Starting the scan; wrote approximately "
+ + dataGen.getTotalNumberOfKeys() + " keys");
+ isWriterDone = writer.isDone();
+ if (isWriterDone) {
+ LOG.info("Scanning full result, writer is done");
+ }
+ Scan scan = new Scan();
+ for (byte[] cf : dataGen.getColumnFamilies()) {
+ scan.addFamily(cf);
+ }
+ scan.setFilter(dataGen.getScanFilter());
+ scan.setLoadColumnFamiliesOnDemand(true);
+ // The number of keys we can expect from scan - lower bound (before scan).
+ // Not a strict lower bound - writer knows nothing about filters, so we report
+ // this from generator. Writer might have generated the value but not put it yet.
+ long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
+ long startTs = EnvironmentEdgeManager.currentTimeMillis();
+ ResultScanner results = table.getScanner(scan);
+ long resultCount = 0;
+ Result result = null;
+ // Verify and count the results.
+ while ((result = results.next()) != null) {
+ boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
+ Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
+ ++resultCount;
+ }
+ long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs;
+ // Verify the result count.
+ long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
+ Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
+ + " were generated ", onesGennedAfterScan >= resultCount);
+ if (isWriterDone) {
+ Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
+ + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
+ } else if (onesGennedBeforeScan * 0.9 > resultCount) {
+ LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
+ + ") - there might be a problem, or the writer might just be slow");
+ }
+ LOG.info("Scan took " + timeTaken + "ms");
+ if (!isWriterDone) {
+ Thread.sleep(WAIT_BETWEEN_SCANS_MS);
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ }
+ }
+ Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
+ Assert.assertTrue("Writer is not done", isWriterDone);
+ // Assert.fail("Boom!");
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Jan 14 23:57:48 2013
@@ -3641,7 +3641,7 @@ public class HRegion implements HeapSize
// First, check if we are at a stop row. If so, there are no more results.
if (stopRow) {
if (filter != null && filter.hasFilterRow()) {
- filter.filterRow(results);
+ filter.filterRow(results);
}
return false;
}
@@ -3670,7 +3670,7 @@ public class HRegion implements HeapSize
final boolean isEmptyRow = results.isEmpty();
// We have the part of the row necessary for filtering (all of it, usually).
- // First filter with the filterRow(List).
+ // First filter with the filterRow(List).
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Mon Jan 14 23:57:48 2013
@@ -129,13 +129,12 @@ public class TestEncodedSeekers {
private void doPuts(HRegion region) throws IOException{
LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(MIN_VALUE_SIZE, MAX_VALUE_SIZE);
for (int i = 0; i < NUM_ROWS; ++i) {
- byte[] key = MultiThreadedWriter.longToByteArrayKey(i);
+ byte[] key = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
Put put = new Put(key);
- String colAsStr = String.valueOf(j);
- byte[] col = Bytes.toBytes(colAsStr);
- byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr);
- put.add(CF_BYTES, Bytes.toBytes(colAsStr), value);
+ byte[] col = Bytes.toBytes(String.valueOf(j));
+ byte[] value = dataGenerator.generateRandomSizeValue(key, col);
+ put.add(CF_BYTES, col, value);
if(VERBOSE){
KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value);
System.err.println(Strings.padFront(i+"", ' ', 4)+" "+kvPut);
@@ -151,7 +150,7 @@ public class TestEncodedSeekers {
private void doGets(HRegion region) throws IOException{
for (int i = 0; i < NUM_ROWS; ++i) {
- final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i);
+ final byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
final String qualStr = String.valueOf(j);
if (VERBOSE) {
@@ -163,8 +162,8 @@ public class TestEncodedSeekers {
get.addColumn(CF_BYTES, qualBytes);
Result result = region.get(get, null);
assertEquals(1, result.size());
- assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr,
- result.getValue(CF_BYTES, qualBytes)));
+ byte[] value = result.getValue(CF_BYTES, qualBytes);
+ assertTrue(LoadTestKVGenerator.verify(value, rowKey, qualBytes));
}
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Mon Jan 14 23:57:48 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.HB
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/**
* A command-line utility that reads, writes, and verifies data. Unlike
@@ -119,7 +121,7 @@ public class LoadTestTool extends Abstra
// Writer options
private int numWriterThreads = DEFAULT_NUM_THREADS;
- private long minColsPerKey, maxColsPerKey;
+ private int minColsPerKey, maxColsPerKey;
private int minColDataSize, maxColDataSize;
private boolean isMultiPut;
@@ -260,7 +262,7 @@ public class LoadTestTool extends Abstra
int colIndex = 0;
minColsPerKey = 1;
- maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]);
+ maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
int avgColDataSize =
parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
minColDataSize = avgColDataSize / 2;
@@ -342,16 +344,16 @@ public class LoadTestTool extends Abstra
initTestTable();
}
+ LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
+ minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
+
if (isWrite) {
- writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);
+ writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
writerThreads.setMultiPut(isMultiPut);
- writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
- writerThreads.setDataSize(minColDataSize, maxColDataSize);
}
if (isRead) {
- readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
- verifyPercent);
+ readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Mon Jan 14 23:57:48 2013
@@ -18,12 +18,19 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.apache.hadoop.util.StringUtils;
/**
@@ -34,7 +41,6 @@ public abstract class MultiThreadedActio
private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
protected final byte[] tableName;
- protected final byte[] columnFamily;
protected final Configuration conf;
protected int numThreads = 1;
@@ -51,8 +57,69 @@ public abstract class MultiThreadedActio
protected AtomicLong totalOpTimeMs = new AtomicLong();
protected boolean verbose = false;
- protected int minDataSize = 256;
- protected int maxDataSize = 1024;
+ protected LoadTestDataGenerator dataGenerator = null;
+
+ /**
+ * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
+ * set of column families, and random number of columns in range. The table for it can
+ * be created manually or, for example, via
+ * {@link HBaseTestingUtility#createPreSplitLoadTestTable(
+ * org.apache.hadoop.hbase.Configuration, byte[], byte[], Algorithm, DataBlockEncoding)}
+ */
+ public static class DefaultDataGenerator extends LoadTestDataGenerator {
+ private byte[][] columnFamilies = null;
+ private int minColumnsPerKey;
+ private int maxColumnsPerKey;
+ private final Random random = new Random();
+
+ public DefaultDataGenerator(int minValueSize, int maxValueSize,
+ int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
+ super(minValueSize, maxValueSize);
+ this.columnFamilies = columnFamilies;
+ this.minColumnsPerKey = minColumnsPerKey;
+ this.maxColumnsPerKey = maxColumnsPerKey;
+ }
+
+ public DefaultDataGenerator(byte[]... columnFamilies) {
+ // Default values for tests that didn't care to provide theirs.
+ this(256, 1024, 1, 10, columnFamilies);
+ }
+
+ @Override
+ public byte[] getDeterministicUniqueKey(long keyBase) {
+ return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
+ }
+
+ @Override
+ public byte[][] getColumnFamilies() {
+ return columnFamilies;
+ }
+
+ @Override
+ public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
+ int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
+ byte[][] columns = new byte[numColumns][];
+ for (int i = 0; i < numColumns; ++i) {
+ columns[i] = Integer.toString(i).getBytes();
+ }
+ return columns;
+ }
+
+ @Override
+ public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
+ return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
+ }
+
+ @Override
+ public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
+ return LoadTestKVGenerator.verify(value, rowKey, cf, column);
+ }
+
+ @Override
+ public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
+ return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
+ }
+ }
/** "R" or "W" */
private String actionLetter;
@@ -62,11 +129,11 @@ public abstract class MultiThreadedActio
public static final int REPORTING_INTERVAL_MS = 5000;
- public MultiThreadedAction(Configuration conf, byte[] tableName,
- byte[] columnFamily, String actionLetter) {
+ public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, byte[] tableName,
+ String actionLetter) {
this.conf = conf;
+ this.dataGenerator = dataGen;
this.tableName = tableName;
- this.columnFamily = columnFamily;
this.actionLetter = actionLetter;
}
@@ -165,17 +232,16 @@ public abstract class MultiThreadedActio
}
}
- public void setDataSize(int minDataSize, int maxDataSize) {
- this.minDataSize = minDataSize;
- this.maxDataSize = maxDataSize;
- }
-
public void waitForFinish() {
while (numThreadsWorking.get() != 0) {
Threads.sleepWithoutInterrupt(1000);
}
}
+ public boolean isDone() {
+ return (numThreadsWorking.get() == 0);
+ }
+
protected void startThreads(Collection<? extends Thread> threads) {
numThreadsWorking.addAndGet(threads.size());
for (Thread thread : threads) {
@@ -202,4 +268,77 @@ public abstract class MultiThreadedActio
sb.append(v);
}
+ /**
+ * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
+ * Does not verify cf/column integrity.
+ */
+ public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
+ return verifyResultAgainstDataGenerator(result, verifyValues, false);
+ }
+
+ /**
+ * Verifies the result from get or scan using the dataGenerator (that was presumably
+ * also used to generate said result).
+ * @param verifyValues verify that values in the result make sense for row/cf/column combination
+ * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
+ * that to use this multiPut should be used, or verification
+ * has to happen after writes, otherwise there can be races.
+ * @return
+ */
+ public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
+ boolean verifyCfAndColumnIntegrity) {
+ String rowKeyStr = Bytes.toString(result.getRow());
+
+ // See if we have any data at all.
+ if (result.isEmpty()) {
+ LOG.error("No data returned for key = [" + rowKeyStr + "]");
+ return false;
+ }
+
+ if (!verifyValues && !verifyCfAndColumnIntegrity) {
+ return true; // as long as we have something, we are good.
+ }
+
+ // See if we have all the CFs.
+ byte[][] expectedCfs = dataGenerator.getColumnFamilies();
+ if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
+ LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size());
+ return false;
+ }
+
+ // Verify each column family from get in the result.
+ for (byte[] cf : result.getMap().keySet()) {
+ String cfStr = Bytes.toString(cf);
+ Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
+ if (columnValues == null) {
+ LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]");
+ return false;
+ }
+ // See if we have correct columns.
+ if (verifyCfAndColumnIntegrity
+ && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
+ String colsStr = "";
+ for (byte[] col : columnValues.keySet()) {
+ if (colsStr.length() > 0) {
+ colsStr += ", ";
+ }
+ colsStr += "[" + Bytes.toString(col) + "]";
+ }
+ LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr);
+ return false;
+ }
+ // See if values check out.
+ if (verifyValues) {
+ for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
+ if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) {
+ LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ + cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " +
+ + kv.getValue().length);
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Mon Jan 14 23:57:48 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
/** Creates multiple threads that read and verify previously written data */
@@ -72,9 +74,9 @@ public class MultiThreadedReader extends
private int maxErrors = DEFAULT_MAX_ERRORS;
private int keyWindow = DEFAULT_KEY_WINDOW;
- public MultiThreadedReader(Configuration conf, byte[] tableName,
- byte[] columnFamily, double verifyPercent) {
- super(conf, tableName, columnFamily, "R");
+ public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
+ byte[] tableName, double verifyPercent) {
+ super(dataGen, conf, tableName, "R");
this.verifyPercent = verifyPercent;
}
@@ -223,14 +225,22 @@ public class MultiThreadedReader extends
}
private Get readKey(long keyToRead) {
- Get get = new Get(
- LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes());
- get.addFamily(columnFamily);
+ Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
+ String cfsString = "";
+ byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+ for (byte[] cf : columnFamilies) {
+ get.addFamily(cf);
+ if (verbose) {
+ if (cfsString.length() > 0) {
+ cfsString += ", ";
+ }
+ cfsString += "[" + Bytes.toStringBinary(cf) + "]";
+ }
+ }
try {
if (verbose) {
- LOG.info("[" + readerId + "] " + "Querying key " + keyToRead
- + ", cf " + Bytes.toStringBinary(columnFamily));
+ LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
queryKey(get, random.nextInt(100) < verifyPercent);
} catch (IOException e) {
@@ -250,47 +260,38 @@ public class MultiThreadedReader extends
Result result = table.get(get);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
numKeys.addAndGet(1);
-
- // if we got no data report error
- if (result.isEmpty()) {
+ if (!result.isEmpty()) {
+ if (verify) {
+ numKeysVerified.incrementAndGet();
+ }
+ } else {
HRegionLocation hloc = table.getRegionLocation(
Bytes.toBytes(rowKey));
LOG.info("Key = " + rowKey + ", RegionServer: "
+ hloc.getHostname());
- numReadErrors.addAndGet(1);
- LOG.error("No data returned, tried to get actions for key = "
- + rowKey + (writer == null ? "" : ", keys inserted by writer: " +
- writer.numKeys.get() + ")"));
-
- if (numReadErrors.get() > maxErrors) {
- LOG.error("Aborting readers -- found more than " + maxErrors
- + " errors\n");
- aborted = true;
- }
}
- if (result.getFamilyMap(columnFamily) != null) {
- // increment number of columns read
- numCols.addAndGet(result.getFamilyMap(columnFamily).size());
-
- if (verify) {
- // verify the result
- List<KeyValue> keyValues = result.list();
- for (KeyValue kv : keyValues) {
- String qual = new String(kv.getQualifier());
-
- // if something does not look right report it
- if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) {
- numReadErrors.addAndGet(1);
- LOG.error("Error checking data for key = " + rowKey
- + ", actionId = " + qual);
- }
- }
- numKeysVerified.addAndGet(1);
+ boolean isOk = verifyResultAgainstDataGenerator(result, verify);
+ long numErrorsAfterThis = 0;
+ if (isOk) {
+ long cols = 0;
+ // Count the columns for reporting purposes.
+ for (byte[] cf : result.getMap().keySet()) {
+ cols += result.getFamilyMap(cf).size();
+ }
+ numCols.addAndGet(cols);
+ } else {
+ if (writer != null) {
+ LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys");
}
+ numErrorsAfterThis = numReadErrors.incrementAndGet();
}
- }
+ if (numErrorsAfterThis > maxErrors) {
+ LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
+ aborted = true;
+ }
+ }
}
public long getNumReadFailures() {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Mon Jan 14 23:57:48 2013
@@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
+import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -33,14 +33,13 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/** Creates multiple threads that write key/values into the */
public class MultiThreadedWriter extends MultiThreadedAction {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
- private long minColumnsPerKey = 1;
- private long maxColumnsPerKey = 10;
private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
private boolean isMultiPut = false;
@@ -51,8 +50,7 @@ public class MultiThreadedWriter extends
* {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
* being inserted. This queue is supposed to stay small.
*/
- private BlockingQueue<Long> insertedKeys =
- new ArrayBlockingQueue<Long>(10000);
+ private BlockingQueue<Long> insertedKeys = new ArrayBlockingQueue<Long>(10000);
/**
* This is the current key to be inserted by any thread. Each thread does an
@@ -78,9 +76,9 @@ public class MultiThreadedWriter extends
/** Enable this if used in conjunction with a concurrent reader. */
private boolean trackInsertedKeys;
- public MultiThreadedWriter(Configuration conf, byte[] tableName,
- byte[] columnFamily) {
- super(conf, tableName, columnFamily, "W");
+ public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
+ byte[] tableName) {
+ super(dataGen, conf, tableName, "W");
}
/** Use multi-puts vs. separate puts for every column in a row */
@@ -88,11 +86,6 @@ public class MultiThreadedWriter extends
this.isMultiPut = isMultiPut;
}
- public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
- this.minColumnsPerKey = minColumnsPerKey;
- this.maxColumnsPerKey = maxColumnsPerKey;
- }
-
@Override
public void start(long startKey, long endKey, int numThreads)
throws IOException {
@@ -118,17 +111,9 @@ public class MultiThreadedWriter extends
startThreads(writers);
}
- public static byte[] longToByteArrayKey(long rowKey) {
- return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
- }
-
private class HBaseWriterThread extends Thread {
private final HTable table;
- private final Random random = new Random();
- private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
- minDataSize, maxDataSize);
-
public HBaseWriterThread(int writerId) throws IOException {
setName(getClass().getSimpleName() + "_" + writerId);
table = new HTable(conf, tableName);
@@ -136,20 +121,36 @@ public class MultiThreadedWriter extends
public void run() {
try {
- long rowKey;
- while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
- long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
- % (maxColumnsPerKey - minColumnsPerKey);
+ long rowKeyBase;
+ byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+ while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) {
+ byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
+ Put put = new Put(rowKey);
numKeys.addAndGet(1);
+ int columnCount = 0;
+ for (byte[] cf : columnFamilies) {
+ String s;
+ byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
+ for (byte[] column : columns) {
+ byte[] value = dataGenerator.generateValue(rowKey, cf, column);
+ put.add(cf, column, value);
+ ++columnCount;
+ if (!isMultiPut) {
+ insert(put, rowKeyBase);
+ numCols.addAndGet(1);
+ put = new Put(rowKey);
+ }
+ }
+ }
if (isMultiPut) {
- multiPutInsertKey(rowKey, 0, numColumns);
- } else {
- for (long col = 0; col < numColumns; ++col) {
- insert(rowKey, col);
+ if (verbose) {
+ LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns");
}
+ insert(put, rowKeyBase);
+ numCols.addAndGet(columnCount);
}
if (trackInsertedKeys) {
- insertedKeys.add(rowKey);
+ insertedKeys.add(rowKeyBase);
}
}
} finally {
@@ -162,52 +163,14 @@ public class MultiThreadedWriter extends
}
}
- public void insert(long rowKey, long col) {
- Put put = new Put(longToByteArrayKey(rowKey));
- String colAsStr = String.valueOf(col);
- put.add(columnFamily, Bytes.toBytes(colAsStr),
- dataGenerator.generateRandomSizeValue(rowKey, colAsStr));
+ public void insert(Put put, long keyBase) {
try {
long start = System.currentTimeMillis();
table.put(put);
- numCols.addAndGet(1);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
- failedKeySet.add(rowKey);
- LOG.error("Failed to insert: " + rowKey);
- e.printStackTrace();
- }
- }
-
- public void multiPutInsertKey(long rowKey, long startCol, long endCol) {
- if (verbose) {
- LOG.debug("Preparing put for key = " + rowKey + ", cols = ["
- + startCol + ", " + endCol + ")");
- }
-
- if (startCol >= endCol) {
- return;
- }
-
- Put put = new Put(LoadTestKVGenerator.md5PrefixedKey(
- rowKey).getBytes());
- byte[] columnQualifier;
- byte[] value;
- for (long i = startCol; i < endCol; ++i) {
- String qualStr = String.valueOf(i);
- columnQualifier = qualStr.getBytes();
- value = dataGenerator.generateRandomSizeValue(rowKey, qualStr);
- put.add(columnFamily, columnQualifier, value);
- }
-
- try {
- long start = System.currentTimeMillis();
- table.put(put);
- numCols.addAndGet(endCol - startCol);
- totalOpTimeMs.addAndGet(
- System.currentTimeMillis() - start);
- } catch (IOException e) {
- failedKeySet.add(rowKey);
+ failedKeySet.add(keyBase);
+ LOG.error("Failed to insert: " + keyBase);
e.printStackTrace();
}
}
@@ -302,8 +265,7 @@ public class MultiThreadedWriter extends
* key, which requires a blocking queue and a consumer thread.
* @param enable whether to enable tracking the last inserted key
*/
- void setTrackInsertedKeys(boolean enable) {
+ public void setTrackInsertedKeys(boolean enable) {
trackInsertedKeys = enable;
}
-
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java Mon Jan 14 23:57:48 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/**
* A command-line tool that spins up a local process-based cluster, loads
@@ -59,8 +60,8 @@ public class RestartMetaTest extends Abs
private void loadData() throws IOException {
long startKey = 0;
long endKey = 100000;
- long minColsPerKey = 5;
- long maxColsPerKey = 15;
+ int minColsPerKey = 5;
+ int maxColsPerKey = 15;
int minColDataSize = 256;
int maxColDataSize = 256 * 3;
int numThreads = 10;
@@ -74,11 +75,10 @@ public class RestartMetaTest extends Abs
System.out.printf("Client Threads: %d\n", numThreads);
// start the writers
- MultiThreadedWriter writer = new MultiThreadedWriter(conf, TABLE_NAME,
- LoadTestTool.COLUMN_FAMILY);
+ LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
+ minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY);
+ MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
writer.setMultiPut(true);
- writer.setColumnsPerKey(minColsPerKey, maxColsPerKey);
- writer.setDataSize(minColDataSize, maxColDataSize);
writer.start(startKey, endKey, numThreads);
System.out.printf("Started loading data...");
writer.waitForFinish();
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java Mon Jan 14 23:57:48 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.TableNotF
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -139,9 +140,10 @@ public class TestMiniClusterLoadSequenti
TEST_UTIL.waitUntilAllRegionsAssigned(numRegions);
- writerThreads = new MultiThreadedWriter(conf, TABLE, CF);
+ LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
+ writerThreads = new MultiThreadedWriter(dataGen, conf, TABLE);
writerThreads.setMultiPut(isMultiPut);
- readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100);
+ readerThreads = new MultiThreadedReader(dataGen, conf, TABLE, 100);
}
protected int numKeys() {