You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2013/03/29 16:53:50 UTC
svn commit: r1462511 - in
/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase: io/encoding/
util/
Author: apurtell
Date: Fri Mar 29 15:53:50 2013
New Revision: 1462511
URL: http://svn.apache.org/r1462511
Log:
HBASE-8210. Backport the LoadTest portions of HBASE-7383
Added:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Fri Mar 29 15:53:50 2013
@@ -32,14 +32,11 @@ import org.apache.hadoop.hbase.client.Ge
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
-import org.apache.hadoop.hbase.util.MultiThreadedWriter;
+
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -101,17 +98,18 @@ public class TestEncodedSeekers {
.setDataBlockEncoding(encoding)
.setEncodeOnDisk(encodeOnDisk)
);
+
LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
MIN_VALUE_SIZE, MAX_VALUE_SIZE);
// Write
for (int i = 0; i < NUM_ROWS; ++i) {
- byte[] key = MultiThreadedWriter.longToByteArrayKey(i);
+ byte[] key = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
Put put = new Put(key);
- String colAsStr = String.valueOf(j);
- byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr);
- put.add(CF_BYTES, Bytes.toBytes(colAsStr), value);
+ byte[] col = Bytes.toBytes(String.valueOf(j));
+ byte[] value = dataGenerator.generateRandomSizeValue(key, col);
+ put.add(CF_BYTES, col, value);
region.put(put);
}
if (i % NUM_ROWS_PER_FLUSH == 0) {
@@ -122,7 +120,7 @@ public class TestEncodedSeekers {
for (int doneCompaction = 0; doneCompaction <= 1; ++doneCompaction) {
// Read
for (int i = 0; i < NUM_ROWS; ++i) {
- final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i);
+ byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
if (VERBOSE) {
System.err.println("Reading row " + i + ", column " + j);
@@ -131,10 +129,10 @@ public class TestEncodedSeekers {
final byte[] qualBytes = Bytes.toBytes(qualStr);
Get get = new Get(rowKey);
get.addColumn(CF_BYTES, qualBytes);
- Result result = region.get(get, null);
+ Result result = region.get(get);
assertEquals(1, result.size());
- assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr,
- result.getValue(CF_BYTES, qualBytes)));
+ byte[] value = result.getValue(CF_BYTES, qualBytes);
+ assertTrue(LoadTestKVGenerator.verify(value, rowKey, qualBytes));
}
}
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java?rev=1462511&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java Fri Mar 29 15:53:50 2013
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.Set;
+
+/**
+ * A generator of random data (keys/cfs/columns/values) for load testing.
+ * Contains LoadTestKVGenerator as a matter of convenience...
+ */
+public abstract class LoadTestDataGenerator {
+ protected final LoadTestKVGenerator kvGenerator;
+
+ /**
+ * Initializes the object.
+ * @param minValueSize minimum size of the value generated by
+ * {@link #generateValue(byte[], byte[], byte[])}.
+ * @param maxValueSize maximum size of the value generated by
+ * {@link #generateValue(byte[], byte[], byte[])}.
+ */
+ public LoadTestDataGenerator(int minValueSize, int maxValueSize) {
+ this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize);
+ }
+ /**
+ * Generates a deterministic, unique hashed row key from a number. That way, the user can
+ * keep track of numbers, without messing with byte array and ensuring key distribution.
+ * @param keyBase Base number for a key, such as a loop counter.
+ */
+ public abstract byte[] getDeterministicUniqueKey(long keyBase);
+
+ /**
+ * Gets column families for the load test table.
+ * @return The array of byte[]s representing column family names.
+ */
+ public abstract byte[][] getColumnFamilies();
+
+ /**
+ * Generates an applicable set of columns to be used for a particular key and family.
+ * @param rowKey The row key to generate for.
+ * @param cf The column family name to generate for.
+ * @return The array of byte[]s representing column names.
+ */
+ public abstract byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf);
+
+ /**
+ * Generates a value to be used for a particular row/cf/column.
+ * @param rowKey The row key to generate for.
+ * @param cf The column family name to generate for.
+ * @param column The column name to generate for.
+ * @return The value to use.
+ */
+ public abstract byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column);
+
+ /**
+ * Checks that columns for a rowKey and cf are valid if generated via
+ * {@link #generateColumnsForCf(byte[], byte[])}
+ * @param rowKey The row key to verify for.
+ * @param cf The column family name to verify for.
+ * @param columnSet The column set (for example, encountered by read).
+ * @return True iff valid.
+ */
+ public abstract boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet);
+
+ /**
+ * Checks that value for a rowKey/cf/column is valid if generated via
+ * {@link #generateValue(byte[], byte[], byte[])}
+ * @param rowKey The row key to verify for.
+ * @param cf The column family name to verify for.
+ * @param column The column name to verify for.
+ * @param value The value (for example, encountered by read).
+ * @return True iff valid.
+ */
+ public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value);
+}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java Fri Mar 29 15:53:50 2013
@@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.util.MD5H
* hash. Values are generated by selecting value size in the configured range
* and generating a pseudo-random sequence of bytes seeded by key, column
* qualifier, and value size.
- * <p>
- * Not thread-safe, so a separate instance is needed for every writer thread/
*/
public class LoadTestKVGenerator {
@@ -49,13 +47,13 @@ public class LoadTestKVGenerator {
/**
* Verifies that the given byte array is the same as what would be generated
- * for the given row key and qualifier. We are assuming that the value size
- * is correct, and only verify the actual bytes. However, if the min/max
- * value sizes are set sufficiently high, an accidental match should be
+ * for the given seed strings (row/cf/column/...). We are assuming that the
+ * value size is correct, and only verify the actual bytes. However, if the
+ * min/max value sizes are set sufficiently high, an accidental match should be
* extremely improbable.
*/
- public static boolean verify(String rowKey, String qual, byte[] value) {
- byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length);
+ public static boolean verify(byte[] value, byte[]... seedStrings) {
+ byte[] expectedData = getValueForRowColumn(value.length, seedStrings);
return Bytes.equals(expectedData, value);
}
@@ -74,25 +72,31 @@ public class LoadTestKVGenerator {
/**
* Generates a value for the given key index and column qualifier. Size is
* selected randomly in the configured range. The generated value depends
- * only on the combination of the key, qualifier, and the selected value
- * size. This allows to verify the actual value bytes when reading, as done
- * in {@link #verify(String, String, byte[])}.
+ * only on the combination of the strings passed (key/cf/column/...) and the selected
+ * value size. This allows to verify the actual value bytes when reading, as done
+ * in {#verify(byte[], byte[]...)}
+ * This method is as thread-safe as Random class. It appears that the worst bug ever
+ * found with the latter is that multiple threads will get some duplicate values, which
+ * we don't care about.
*/
- public byte[] generateRandomSizeValue(long key, String qual) {
- String rowKey = md5PrefixedKey(key);
- int dataSize = minValueSize + randomForValueSize.nextInt(
- Math.abs(maxValueSize - minValueSize));
- return getValueForRowColumn(rowKey, qual, dataSize);
- }
+ public byte[] generateRandomSizeValue(byte[]... seedStrings) {
+ int dataSize = minValueSize;
+ if (minValueSize != maxValueSize) {
+ dataSize = minValueSize + randomForValueSize.nextInt(Math.abs(maxValueSize - minValueSize));
+ }
+ return getValueForRowColumn(dataSize, seedStrings);
+ }
/**
* Generates random bytes of the given size for the given row and column
* qualifier. The random seed is fully determined by these parameters.
*/
- private static byte[] getValueForRowColumn(String rowKey, String qual,
- int dataSize) {
- Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() +
- dataSize);
+ private static byte[] getValueForRowColumn(int dataSize, byte[]... seedStrings) {
+ long seed = dataSize;
+ for (byte[] str : seedStrings) {
+ seed += Bytes.toString(str).hashCode();
+ }
+ Random seededRandom = new Random(seed);
byte[] randomBytes = new byte[dataSize];
seededRandom.nextBytes(randomBytes);
return randomBytes;
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Fri Mar 29 15:53:50 2013
@@ -118,7 +118,7 @@ public class LoadTestTool extends Abstra
// Writer options
private int numWriterThreads = DEFAULT_NUM_THREADS;
- private long minColsPerKey, maxColsPerKey;
+ private int minColsPerKey, maxColsPerKey;
private int minColDataSize, maxColDataSize;
private boolean isMultiPut;
@@ -259,7 +259,7 @@ public class LoadTestTool extends Abstra
int colIndex = 0;
minColsPerKey = 1;
- maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]);
+ maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
int avgColDataSize =
parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
minColDataSize = avgColDataSize / 2;
@@ -341,16 +341,16 @@ public class LoadTestTool extends Abstra
initTestTable();
}
+ LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
+ minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
+
if (isWrite) {
- writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);
+ writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
writerThreads.setMultiPut(isMultiPut);
- writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
- writerThreads.setDataSize(minColDataSize, maxColDataSize);
}
if (isRead) {
- readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
- verifyPercent);
+ readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Fri Mar 29 15:53:50 2013
@@ -18,12 +18,16 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Collection;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.util.StringUtils;
/**
@@ -34,7 +38,6 @@ public abstract class MultiThreadedActio
private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
protected final byte[] tableName;
- protected final byte[] columnFamily;
protected final Configuration conf;
protected int numThreads = 1;
@@ -51,8 +54,69 @@ public abstract class MultiThreadedActio
protected AtomicLong totalOpTimeMs = new AtomicLong();
protected boolean verbose = false;
- protected int minDataSize = 256;
- protected int maxDataSize = 1024;
+ protected LoadTestDataGenerator dataGenerator = null;
+
+ /**
+ * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
+ * set of column families, and random number of columns in range. The table for it can
+ * be created manually or, for example, via
+ * {@link HBaseTestingUtility#createPreSplitLoadTestTable(
+ * org.apache.hadoop.hbase.Configuration, byte[], byte[], Algorithm, DataBlockEncoding)}
+ */
+ public static class DefaultDataGenerator extends LoadTestDataGenerator {
+ private byte[][] columnFamilies = null;
+ private int minColumnsPerKey;
+ private int maxColumnsPerKey;
+ private final Random random = new Random();
+
+ public DefaultDataGenerator(int minValueSize, int maxValueSize,
+ int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
+ super(minValueSize, maxValueSize);
+ this.columnFamilies = columnFamilies;
+ this.minColumnsPerKey = minColumnsPerKey;
+ this.maxColumnsPerKey = maxColumnsPerKey;
+ }
+
+ public DefaultDataGenerator(byte[]... columnFamilies) {
+ // Default values for tests that didn't care to provide theirs.
+ this(256, 1024, 1, 10, columnFamilies);
+ }
+
+ @Override
+ public byte[] getDeterministicUniqueKey(long keyBase) {
+ return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
+ }
+
+ @Override
+ public byte[][] getColumnFamilies() {
+ return columnFamilies;
+ }
+
+ @Override
+ public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
+ int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
+ byte[][] columns = new byte[numColumns][];
+ for (int i = 0; i < numColumns; ++i) {
+ columns[i] = Integer.toString(i).getBytes();
+ }
+ return columns;
+ }
+
+ @Override
+ public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
+ return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
+ }
+
+ @Override
+ public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
+ return LoadTestKVGenerator.verify(value, rowKey, cf, column);
+ }
+
+ @Override
+ public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
+ return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
+ }
+ }
/** "R" or "W" */
private String actionLetter;
@@ -62,11 +126,11 @@ public abstract class MultiThreadedActio
public static final int REPORTING_INTERVAL_MS = 5000;
- public MultiThreadedAction(Configuration conf, byte[] tableName,
- byte[] columnFamily, String actionLetter) {
+ public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, byte[] tableName,
+ String actionLetter) {
this.conf = conf;
this.tableName = tableName;
- this.columnFamily = columnFamily;
+ this.dataGenerator = dataGen;
this.actionLetter = actionLetter;
}
@@ -165,17 +229,16 @@ public abstract class MultiThreadedActio
}
}
- public void setDataSize(int minDataSize, int maxDataSize) {
- this.minDataSize = minDataSize;
- this.maxDataSize = maxDataSize;
- }
-
public void waitForFinish() {
while (numThreadsWorking.get() != 0) {
Threads.sleepWithoutInterrupt(1000);
}
}
+ public boolean isDone() {
+ return (numThreadsWorking.get() == 0);
+ }
+
protected void startThreads(Collection<? extends Thread> threads) {
numThreadsWorking.addAndGet(threads.size());
for (Thread thread : threads) {
@@ -202,4 +265,77 @@ public abstract class MultiThreadedActio
sb.append(v);
}
+ /**
+ * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
+ * Does not verify cf/column integrity.
+ */
+ public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
+ return verifyResultAgainstDataGenerator(result, verifyValues, false);
+ }
+
+ /**
+ * Verifies the result from get or scan using the dataGenerator (that was presumably
+ * also used to generate said result).
+ * @param verifyValues verify that values in the result make sense for row/cf/column combination
+ * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
+ * that to use this multiPut should be used, or verification
+ * has to happen after writes, otherwise there can be races.
+ * @return
+ */
+ public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
+ boolean verifyCfAndColumnIntegrity) {
+ String rowKeyStr = Bytes.toString(result.getRow());
+
+ // See if we have any data at all.
+ if (result.isEmpty()) {
+ LOG.error("No data returned for key = [" + rowKeyStr + "]");
+ return false;
+ }
+
+ if (!verifyValues && !verifyCfAndColumnIntegrity) {
+ return true; // as long as we have something, we are good.
+ }
+
+ // See if we have all the CFs.
+ byte[][] expectedCfs = dataGenerator.getColumnFamilies();
+ if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
+ LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size());
+ return false;
+ }
+
+ // Verify each column family from get in the result.
+ for (byte[] cf : result.getMap().keySet()) {
+ String cfStr = Bytes.toString(cf);
+ Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
+ if (columnValues == null) {
+ LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]");
+ return false;
+ }
+ // See if we have correct columns.
+ if (verifyCfAndColumnIntegrity
+ && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
+ String colsStr = "";
+ for (byte[] col : columnValues.keySet()) {
+ if (colsStr.length() > 0) {
+ colsStr += ", ";
+ }
+ colsStr += "[" + Bytes.toString(col) + "]";
+ }
+ LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr);
+ return false;
+ }
+ // See if values check out.
+ if (verifyValues) {
+ for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
+ if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) {
+ LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ + cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " +
+ + kv.getValue().length);
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Fri Mar 29 15:53:50 2013
@@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
-import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -27,14 +26,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
/** Creates multiple threads that read and verify previously written data */
-public class MultiThreadedReader extends MultiThreadedAction
-{
+public class MultiThreadedReader extends MultiThreadedAction {
+
private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
private Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
@@ -71,9 +69,9 @@ public class MultiThreadedReader extends
private int maxErrors = DEFAULT_MAX_ERRORS;
private int keyWindow = DEFAULT_KEY_WINDOW;
- public MultiThreadedReader(Configuration conf, byte[] tableName,
- byte[] columnFamily, double verifyPercent) {
- super(conf, tableName, columnFamily, "R");
+ public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
+ byte[] tableName, double verifyPercent) {
+ super(dataGen, conf, tableName, "R");
this.verifyPercent = verifyPercent;
}
@@ -222,14 +220,22 @@ public class MultiThreadedReader extends
}
private Get readKey(long keyToRead) {
- Get get = new Get(
- LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes());
- get.addFamily(columnFamily);
+ Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
+ String cfsString = "";
+ byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+ for (byte[] cf : columnFamilies) {
+ get.addFamily(cf);
+ if (verbose) {
+ if (cfsString.length() > 0) {
+ cfsString += ", ";
+ }
+ cfsString += "[" + Bytes.toStringBinary(cf) + "]";
+ }
+ }
try {
if (verbose) {
- LOG.info("[" + readerId + "] " + "Querying key " + keyToRead
- + ", cf " + Bytes.toStringBinary(columnFamily));
+ LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
queryKey(get, random.nextInt(100) < verifyPercent);
} catch (IOException e) {
@@ -251,45 +257,37 @@ public class MultiThreadedReader extends
numKeys.addAndGet(1);
// if we got no data report error
- if (result.isEmpty()) {
- HRegionLocation hloc = table.getRegionLocation(
- Bytes.toBytes(rowKey));
+ if (!result.isEmpty()) {
+ if (verify) {
+ numKeysVerified.incrementAndGet();
+ }
+ } else {
+ HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(rowKey));
LOG.info("Key = " + rowKey + ", RegionServer: "
+ hloc.getHostname());
- numReadErrors.addAndGet(1);
- LOG.error("No data returned, tried to get actions for key = "
- + rowKey + (writer == null ? "" : ", keys inserted by writer: " +
- writer.numKeys.get() + ")"));
-
- if (numReadErrors.get() > maxErrors) {
- LOG.error("Aborting readers -- found more than " + maxErrors
- + " errors\n");
- aborted = true;
- }
}
- if (result.getFamilyMap(columnFamily) != null) {
- // increment number of columns read
- numCols.addAndGet(result.getFamilyMap(columnFamily).size());
-
- if (verify) {
- // verify the result
- List<KeyValue> keyValues = result.list();
- for (KeyValue kv : keyValues) {
- String qual = new String(kv.getQualifier());
-
- // if something does not look right report it
- if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) {
- numReadErrors.addAndGet(1);
- LOG.error("Error checking data for key = " + rowKey
- + ", actionId = " + qual);
- }
- }
- numKeysVerified.addAndGet(1);
+ boolean isOk = verifyResultAgainstDataGenerator(result, verify);
+ long numErrorsAfterThis = 0;
+ if (isOk) {
+ long cols = 0;
+ // Count the columns for reporting purposes.
+ for (byte[] cf : result.getMap().keySet()) {
+ cols += result.getFamilyMap(cf).size();
+ }
+ numCols.addAndGet(cols);
+ } else {
+ if (writer != null) {
+ LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys");
}
+ numErrorsAfterThis = numReadErrors.incrementAndGet();
}
- }
+ if (numErrorsAfterThis > maxErrors) {
+ LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
+ aborted = true;
+ }
+ }
}
public long getNumReadFailures() {
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Fri Mar 29 15:53:50 2013
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Queue;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -38,8 +37,6 @@ import org.apache.hadoop.hbase.client.Pu
public class MultiThreadedWriter extends MultiThreadedAction {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
- private long minColumnsPerKey = 1;
- private long maxColumnsPerKey = 10;
private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
private boolean isMultiPut = false;
@@ -50,8 +47,7 @@ public class MultiThreadedWriter extends
* {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
* being inserted. This queue is supposed to stay small.
*/
- private BlockingQueue<Long> insertedKeys =
- new ArrayBlockingQueue<Long>(10000);
+ private BlockingQueue<Long> insertedKeys = new ArrayBlockingQueue<Long>(10000);
/**
* This is the current key to be inserted by any thread. Each thread does an
@@ -77,9 +73,9 @@ public class MultiThreadedWriter extends
/** Enable this if used in conjunction with a concurrent reader. */
private boolean trackInsertedKeys;
- public MultiThreadedWriter(Configuration conf, byte[] tableName,
- byte[] columnFamily) {
- super(conf, tableName, columnFamily, "W");
+ public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
+ byte[] tableName) {
+ super(dataGen, conf, tableName, "W");
}
/** Use multi-puts vs. separate puts for every column in a row */
@@ -87,11 +83,6 @@ public class MultiThreadedWriter extends
this.isMultiPut = isMultiPut;
}
- public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
- this.minColumnsPerKey = minColumnsPerKey;
- this.maxColumnsPerKey = maxColumnsPerKey;
- }
-
@Override
public void start(long startKey, long endKey, int numThreads)
throws IOException {
@@ -117,17 +108,9 @@ public class MultiThreadedWriter extends
startThreads(writers);
}
- public static byte[] longToByteArrayKey(long rowKey) {
- return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
- }
-
private class HBaseWriterThread extends Thread {
private final HTable table;
- private final Random random = new Random();
- private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
- minDataSize, maxDataSize);
-
public HBaseWriterThread(int writerId) throws IOException {
setName(getClass().getSimpleName() + "_" + writerId);
table = new HTable(conf, tableName);
@@ -135,20 +118,35 @@ public class MultiThreadedWriter extends
public void run() {
try {
- long rowKey;
- while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
- long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
- % (maxColumnsPerKey - minColumnsPerKey);
+ long rowKeyBase;
+ byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+ while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) {
+ byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
+ Put put = new Put(rowKey);
numKeys.addAndGet(1);
+ int columnCount = 0;
+ for (byte[] cf : columnFamilies) {
+ byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
+ for (byte[] column : columns) {
+ byte[] value = dataGenerator.generateValue(rowKey, cf, column);
+ put.add(cf, column, value);
+ ++columnCount;
+ if (!isMultiPut) {
+ insert(put, rowKeyBase);
+ numCols.addAndGet(1);
+ put = new Put(rowKey);
+ }
+ }
+ }
if (isMultiPut) {
- multiPutInsertKey(rowKey, 0, numColumns);
- } else {
- for (long col = 0; col < numColumns; ++col) {
- insert(rowKey, col);
+ if (verbose) {
+ LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns");
}
+ insert(put, rowKeyBase);
+ numCols.addAndGet(columnCount);
}
if (trackInsertedKeys) {
- insertedKeys.add(rowKey);
+ insertedKeys.add(rowKeyBase);
}
}
} finally {
@@ -161,52 +159,14 @@ public class MultiThreadedWriter extends
}
}
- public void insert(long rowKey, long col) {
- Put put = new Put(longToByteArrayKey(rowKey));
- String colAsStr = String.valueOf(col);
- put.add(columnFamily, Bytes.toBytes(colAsStr),
- dataGenerator.generateRandomSizeValue(rowKey, colAsStr));
+ public void insert(Put put, long keyBase) {
try {
long start = System.currentTimeMillis();
table.put(put);
- numCols.addAndGet(1);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
- failedKeySet.add(rowKey);
- LOG.error("Failed to insert: " + rowKey);
- e.printStackTrace();
- }
- }
-
- public void multiPutInsertKey(long rowKey, long startCol, long endCol) {
- if (verbose) {
- LOG.debug("Preparing put for key = " + rowKey + ", cols = ["
- + startCol + ", " + endCol + ")");
- }
-
- if (startCol >= endCol) {
- return;
- }
-
- Put put = new Put(LoadTestKVGenerator.md5PrefixedKey(
- rowKey).getBytes());
- byte[] columnQualifier;
- byte[] value;
- for (long i = startCol; i < endCol; ++i) {
- String qualStr = String.valueOf(i);
- columnQualifier = qualStr.getBytes();
- value = dataGenerator.generateRandomSizeValue(rowKey, qualStr);
- put.add(columnFamily, columnQualifier, value);
- }
-
- try {
- long start = System.currentTimeMillis();
- table.put(put);
- numCols.addAndGet(endCol - startCol);
- totalOpTimeMs.addAndGet(
- System.currentTimeMillis() - start);
- } catch (IOException e) {
- failedKeySet.add(rowKey);
+ failedKeySet.add(keyBase);
+ LOG.error("Failed to insert: " + keyBase);
e.printStackTrace();
}
}
@@ -301,7 +261,7 @@ public class MultiThreadedWriter extends
* key, which requires a blocking queue and a consumer thread.
* @param enable whether to enable tracking the last inserted key
*/
- void setTrackInsertedKeys(boolean enable) {
+ public void setTrackInsertedKeys(boolean enable) {
trackInsertedKeys = enable;
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java Fri Mar 29 15:53:50 2013
@@ -62,8 +62,8 @@ public class RestartMetaTest extends Abs
private void loadData() throws IOException {
long startKey = 0;
long endKey = 100000;
- long minColsPerKey = 5;
- long maxColsPerKey = 15;
+ int minColsPerKey = 5;
+ int maxColsPerKey = 15;
int minColDataSize = 256;
int maxColDataSize = 256 * 3;
int numThreads = 10;
@@ -77,11 +77,10 @@ public class RestartMetaTest extends Abs
System.out.printf("Client Threads: %d\n", numThreads);
// start the writers
- MultiThreadedWriter writer = new MultiThreadedWriter(conf, TABLE_NAME,
- LoadTestTool.COLUMN_FAMILY);
+ LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
+ minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY);
+ MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
writer.setMultiPut(true);
- writer.setColumnsPerKey(minColsPerKey, maxColsPerKey);
- writer.setDataSize(minColDataSize, maxColDataSize);
writer.start(startKey, endKey, numThreads);
System.out.printf("Started loading data...");
writer.waitForFinish();
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java Fri Mar 29 15:53:50 2013
@@ -39,8 +39,8 @@ public class TestLoadTestKVGenerator {
@Test
public void testValueLength() {
for (int i = 0; i < 1000; ++i) {
- byte[] v = gen.generateRandomSizeValue(i,
- String.valueOf(rand.nextInt()));
+ byte[] v = gen.generateRandomSizeValue(Integer.toString(i).getBytes(),
+ String.valueOf(rand.nextInt()).getBytes());
assertTrue(MIN_LEN <= v.length);
assertTrue(v.length <= MAX_LEN);
}
@@ -50,12 +50,12 @@ public class TestLoadTestKVGenerator {
public void testVerification() {
for (int i = 0; i < 1000; ++i) {
for (int qualIndex = 0; qualIndex < 20; ++qualIndex) {
- String qual = String.valueOf(qualIndex);
- byte[] v = gen.generateRandomSizeValue(i, qual);
- String rowKey = LoadTestKVGenerator.md5PrefixedKey(i);
- assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v));
+ byte[] qual = String.valueOf(qualIndex).getBytes();
+ byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
+ byte[] v = gen.generateRandomSizeValue(rowKey, qual);
+ assertTrue(LoadTestKVGenerator.verify(v, rowKey, qual));
v[0]++;
- assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v));
+ assertFalse(LoadTestKVGenerator.verify(v, rowKey, qual));
}
}
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java Fri Mar 29 15:53:50 2013
@@ -136,9 +136,10 @@ public class TestMiniClusterLoadSequenti
TEST_UTIL.waitUntilAllRegionsAssigned(numRegions);
- writerThreads = new MultiThreadedWriter(conf, TABLE, CF);
+ LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
+ writerThreads = new MultiThreadedWriter(dataGen, conf, TABLE);
writerThreads.setMultiPut(isMultiPut);
- readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100);
+ readerThreads = new MultiThreadedReader(dataGen, conf, TABLE, 100);
}
protected int numKeys() {