You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/08/10 20:53:47 UTC
svn commit: r1512817 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/
hbase-it/src/test/java/org/apache/hadoop/hbase/
hbase-server/src/test/java/org/apache/hadoop/hbase/util/
Author: jxiang
Date: Sat Aug 10 18:53:46 2013
New Revision: 1512817
URL: http://svn.apache.org/r1512817
Log:
HBASE-9155 Enhance LoadTestTool to support updates
Added:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java (with props)
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java (with props)
Modified:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java
hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java Sat Aug 10 18:53:46 2013
@@ -25,6 +25,14 @@ import java.util.Set;
public abstract class LoadTestDataGenerator {
protected final LoadTestKVGenerator kvGenerator;
+ // The mutate info column stores information
+ // about update done to this column family this row.
+ public final static byte[] MUTATE_INFO = "mutate_info".getBytes();
+
+ // The increment column always has a long value,
+ // which can be incremented later on during updates.
+ public final static byte[] INCREMENT = "increment".getBytes();
+
/**
* Initializes the object.
* @param minValueSize minimum size of the value generated by
Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java Sat Aug 10 18:53:46 2013
@@ -115,6 +115,19 @@ public abstract class IngestIntegrationT
ret = loadTool.run(new String[] {
"-tn", tableName,
+ "-update", String.format("60:%d", writeThreads),
+ "-start_key", String.valueOf(startKey),
+ "-num_keys", String.valueOf(numKeys),
+ "-skip_init"
+ });
+ if (0 != ret) {
+ String errorMsg = "Update failed with error code " + ret;
+ LOG.error(errorMsg);
+ Assert.fail(errorMsg);
+ }
+
+ ret = loadTool.run(new String[] {
+ "-tn", tableName,
"-read", "100:20",
"-start_key", String.valueOf(startKey),
"-num_keys", String.valueOf(numKeys),
Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java Sat Aug 10 18:53:46 2013
@@ -24,8 +24,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
-import junit.framework.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.util.Regi
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Sat Aug 10 18:53:46 2013
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
@@ -70,10 +69,14 @@ public class LoadTestTool extends Abstra
"<avg_cols_per_key>:<avg_data_size>" +
"[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
- /** Usa\ge string for the read option */
+ /** Usage string for the read option */
protected static final String OPT_USAGE_READ =
"<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+ /** Usage string for the update option */
+ protected static final String OPT_USAGE_UPDATE =
+ "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+
protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
Arrays.toString(BloomType.values());
@@ -111,6 +114,8 @@ public class LoadTestTool extends Abstra
protected static final String OPT_SKIP_INIT = "skip_init";
protected static final String OPT_INIT_ONLY = "init_only";
private static final String NUM_TABLES = "num_tables";
+ protected static final String OPT_BATCHUPDATE = "batchupdate";
+ protected static final String OPT_UPDATE = "update";
protected static final long DEFAULT_START_KEY = 0;
@@ -119,10 +124,11 @@ public class LoadTestTool extends Abstra
protected MultiThreadedWriter writerThreads = null;
protected MultiThreadedReader readerThreads = null;
+ protected MultiThreadedUpdater updaterThreads = null;
protected long startKey, endKey;
- protected boolean isWrite, isRead;
+ protected boolean isWrite, isRead, isUpdate;
// Column family options
protected DataBlockEncoding dataBlockEncodingAlgo;
@@ -136,6 +142,11 @@ public class LoadTestTool extends Abstra
protected int minColDataSize, maxColDataSize;
protected boolean isMultiPut;
+ // Updater options
+ protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
+ protected int updatePercent;
+ protected boolean isBatchUpdate;
+
// Reader options
private int numReaderThreads = DEFAULT_NUM_THREADS;
private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
@@ -212,6 +223,7 @@ public class LoadTestTool extends Abstra
addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
addOptWithArg(OPT_READ, OPT_USAGE_READ);
+ addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
@@ -225,6 +237,8 @@ public class LoadTestTool extends Abstra
addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
"separate puts for every column in a row");
+ addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
+ "separate updates for every column in a row");
addOptNoArg(OPT_ENCODE_IN_CACHE_ONLY, OPT_ENCODE_IN_CACHE_ONLY_USAGE);
addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
@@ -250,16 +264,17 @@ public class LoadTestTool extends Abstra
isWrite = cmd.hasOption(OPT_WRITE);
isRead = cmd.hasOption(OPT_READ);
+ isUpdate = cmd.hasOption(OPT_UPDATE);
isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
- if (!isWrite && !isRead && !isInitOnly) {
+ if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
- "-" + OPT_READ + " has to be specified");
+ "-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified");
}
- if (isInitOnly && (isRead || isWrite)) {
+ if (isInitOnly && (isRead || isWrite || isUpdate)) {
throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
- + " either -" + OPT_WRITE + " or -" + OPT_READ);
+ + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
}
if (!isInitOnly) {
@@ -303,6 +318,21 @@ public class LoadTestTool extends Abstra
+ maxColDataSize);
}
+ if (isUpdate) {
+ String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 2);
+ int colIndex = 0;
+ updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
+ if (colIndex < mutateOpts.length) {
+ numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
+ }
+
+ isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
+
+ System.out.println("Batch updates: " + isBatchUpdate);
+ System.out.println("Percent of keys to update: " + updatePercent);
+ System.out.println("Updater threads: " + numUpdaterThreads);
+ }
+
if (isRead) {
String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
int colIndex = 0;
@@ -390,16 +420,27 @@ public class LoadTestTool extends Abstra
writerThreads.setMultiPut(isMultiPut);
}
+ if (isUpdate) {
+ updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
+ updaterThreads.setBatchUpdate(isBatchUpdate);
+ }
+
if (isRead) {
readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
}
- if (isRead && isWrite) {
- LOG.info("Concurrent read/write workload: making readers aware of the " +
- "write point");
- readerThreads.linkToWriter(writerThreads);
+ if (isUpdate && isWrite) {
+ LOG.info("Concurrent write/update workload: making updaters aware of the " +
+ "write point");
+ updaterThreads.linkToWriter(writerThreads);
+ }
+
+ if (isRead && (isUpdate || isWrite)) {
+ LOG.info("Concurrent write/read workload: making readers aware of the " +
+ "write point");
+ readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
}
if (isWrite) {
@@ -407,6 +448,11 @@ public class LoadTestTool extends Abstra
writerThreads.start(startKey, endKey, numWriterThreads);
}
+ if (isUpdate) {
+ System.out.println("Starting to mutate data...");
+ updaterThreads.start(startKey, endKey, numUpdaterThreads);
+ }
+
if (isRead) {
System.out.println("Starting to read data...");
readerThreads.start(startKey, endKey, numReaderThreads);
@@ -416,6 +462,10 @@ public class LoadTestTool extends Abstra
writerThreads.waitForFinish();
}
+ if (isUpdate) {
+ updaterThreads.waitForFinish();
+ }
+
if (isRead) {
readerThreads.waitForFinish();
}
@@ -424,13 +474,16 @@ public class LoadTestTool extends Abstra
if (isWrite) {
success = success && writerThreads.getNumWriteFailures() == 0;
}
+ if (isUpdate) {
+ success = success && updaterThreads.getNumWriteFailures() == 0;
+ }
if (isRead) {
success = success && readerThreads.getNumReadErrors() == 0
&& readerThreads.getNumReadFailures() == 0;
}
- return success ? EXIT_SUCCESS : this.EXIT_FAILURE;
+ return success ? EXIT_SUCCESS : EXIT_FAILURE;
}
-
+
public static void main(String[] args) {
new LoadTestTool().doStaticMain(args);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Sat Aug 10 18:53:46 2013
@@ -16,8 +16,13 @@
*/
package org.apache.hadoop.hbase.util;
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
+
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -27,12 +32,18 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.base.Preconditions;
+
/**
* Common base class for reader and writer parts of multi-thread HBase load
* test ({@link LoadTestTool}).
@@ -300,7 +311,7 @@ public abstract class MultiThreadedActio
// See if we have any data at all.
if (result.isEmpty()) {
- LOG.error("No data returned for key = [" + rowKeyStr + "]");
+ LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
return false;
}
@@ -311,7 +322,8 @@ public abstract class MultiThreadedActio
// See if we have all the CFs.
byte[][] expectedCfs = dataGenerator.getColumnFamilies();
if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
- LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size());
+ LOG.error("Error checking data for key [" + rowKeyStr
+ + "], bad family count: " + result.getMap().size());
return false;
}
@@ -320,34 +332,155 @@ public abstract class MultiThreadedActio
String cfStr = Bytes.toString(cf);
Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
if (columnValues == null) {
- LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]");
+ LOG.error("Error checking data for key [" + rowKeyStr
+ + "], no data for family [" + cfStr + "]]");
return false;
}
- // See if we have correct columns.
- if (verifyCfAndColumnIntegrity
- && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
- String colsStr = "";
- for (byte[] col : columnValues.keySet()) {
- if (colsStr.length() > 0) {
- colsStr += ", ";
+
+ Map<String, MutationType> mutateInfo = null;
+ if (verifyCfAndColumnIntegrity || verifyValues) {
+ if (!columnValues.containsKey(MUTATE_INFO)) {
+ LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
+ return false;
+ }
+
+ long cfHash = Arrays.hashCode(cf);
+ // Verify deleted columns, and make up column counts if deleted
+ byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
+ mutateInfo = parseMutateInfo(mutateInfoValue);
+ for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) {
+ if (mutate.getValue() == MutationType.DELETE) {
+ byte[] column = Bytes.toBytes(mutate.getKey());
+ long columnHash = Arrays.hashCode(column);
+ long hashCode = cfHash + columnHash;
+ if (hashCode % 2 == 0) {
+ if (columnValues.containsKey(column)) {
+ LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ + cfStr + "], column [" + mutate.getKey() + "]; should be deleted");
+ return false;
+ }
+ byte[] hashCodeBytes = Bytes.toBytes(hashCode);
+ columnValues.put(column, hashCodeBytes);
+ }
}
- colsStr += "[" + Bytes.toString(col) + "]";
}
- LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr);
- return false;
- }
- // See if values check out.
- if (verifyValues) {
- for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
- if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) {
+
+ // Verify increment
+ if (!columnValues.containsKey(INCREMENT)) {
+ LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ + cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
+ return false;
+ }
+ long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
+ if (verifyValues) {
+ long amount = mutateInfo.isEmpty() ? 0 : cfHash;
+ long originalValue = Arrays.hashCode(result.getRow());
+ long extra = currentValue - originalValue;
+ if (extra != 0 && (amount == 0 || extra % amount != 0)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
- + cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " +
- + kv.getValue().length);
+ + cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
return false;
}
+ if (amount != 0 && extra != amount) {
+ LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
+ + cfStr + "], column [increment], incremented [" + (extra / amount) + "] times");
+ }
+ }
+
+ // See if we have correct columns.
+ if (verifyCfAndColumnIntegrity
+ && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
+ String colsStr = "";
+ for (byte[] col : columnValues.keySet()) {
+ if (colsStr.length() > 0) {
+ colsStr += ", ";
+ }
+ colsStr += "[" + Bytes.toString(col) + "]";
+ }
+ LOG.error("Error checking data for key [" + rowKeyStr
+ + "], bad columns for family [" + cfStr + "]: " + colsStr);
+ return false;
+ }
+ // See if values check out.
+ if (verifyValues) {
+ for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
+ String column = Bytes.toString(kv.getKey());
+ MutationType mutation = mutateInfo.get(column);
+ boolean verificationNeeded = true;
+ byte[] bytes = kv.getValue();
+ if (mutation != null) {
+ boolean mutationVerified = true;
+ long columnHash = Arrays.hashCode(kv.getKey());
+ long hashCode = cfHash + columnHash;
+ byte[] hashCodeBytes = Bytes.toBytes(hashCode);
+ if (mutation == MutationType.APPEND) {
+ int offset = bytes.length - hashCodeBytes.length;
+ mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes,
+ 0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
+ if (mutationVerified) {
+ int n = 1;
+ while (true) {
+ int newOffset = offset - hashCodeBytes.length;
+ if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0,
+ hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) {
+ break;
+ }
+ offset = newOffset;
+ n++;
+ }
+ if (n > 1) {
+ LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
+ + cfStr + "], column [" + column + "], appended [" + n + "] times");
+ }
+ byte[] dest = new byte[offset];
+ System.arraycopy(bytes, 0, dest, 0, offset);
+ bytes = dest;
+ }
+ } else if (hashCode % 2 == 0) { // checkAndPut
+ mutationVerified = Bytes.equals(bytes, hashCodeBytes);
+ verificationNeeded = false;
+ }
+ if (!mutationVerified) {
+ LOG.error("Error checking data for key [" + rowKeyStr
+ + "], mutation checking failed for column family [" + cfStr + "], column ["
+ + column + "]; mutation [" + mutation + "], hashCode ["
+ + hashCode + "], verificationNeeded ["
+ + verificationNeeded + "]");
+ return false;
+ }
+ } // end of mutation checking
+ if (verificationNeeded &&
+ !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) {
+ LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ + cfStr + "], column [" + column + "], mutation [" + mutation
+ + "]; value of length " + bytes.length);
+ return false;
+ }
+ }
}
}
}
return true;
}
+
+ // Parse mutate info into a map of <column name> => <update action>
+ private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
+ Map<String, MutationType> mi = new HashMap<String, MutationType>();
+ if (mutateInfo != null) {
+ String mutateInfoStr = Bytes.toString(mutateInfo);
+ String[] mutations = mutateInfoStr.split("#");
+ for (String mutation: mutations) {
+ if (mutation.isEmpty()) continue;
+ Preconditions.checkArgument(mutation.contains(":"),
+ "Invalid mutation info " + mutation);
+ int p = mutation.indexOf(":");
+ String column = mutation.substring(0, p);
+ MutationType type = MutationType.valueOf(
+ Integer.parseInt(mutation.substring(p+1)));
+ mi.put(column, type);
+ }
+ }
+ return mi;
+ }
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Sat Aug 10 18:53:46 2013
@@ -18,15 +18,15 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@@ -41,7 +41,7 @@ public class MultiThreadedReader extends
private final double verifyPercent;
private volatile boolean aborted;
- private MultiThreadedWriter writer = null;
+ private MultiThreadedWriterBase writer = null;
/**
* The number of keys verified in a sequence. This will never be larger than
@@ -77,9 +77,9 @@ public class MultiThreadedReader extends
this.verifyPercent = verifyPercent;
}
- public void linkToWriter(MultiThreadedWriter writer) {
+ public void linkToWriter(MultiThreadedWriterBase writer) {
this.writer = writer;
- writer.setTrackInsertedKeys(true);
+ writer.setTrackWroteKeys(true);
}
public void setMaxErrors(int maxErrors) {
@@ -108,7 +108,6 @@ public class MultiThreadedReader extends
public class HBaseReaderThread extends Thread {
private final int readerId;
private final HTable table;
- private final Random random = new Random();
/** The "current" key being read. Increases from startKey to endKey. */
private long curKey;
@@ -182,13 +181,13 @@ public class MultiThreadedReader extends
* constraint.
*/
private long maxKeyWeCanRead() {
- long insertedUpToKey = writer.insertedUpToKey();
+ long insertedUpToKey = writer.wroteUpToKey();
if (insertedUpToKey >= endKey - 1) {
// The writer has finished writing our range, so we can read any
// key in the range.
return endKey - 1;
}
- return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow);
+ return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
}
private long getNextKeyToRead() {
@@ -217,7 +216,7 @@ public class MultiThreadedReader extends
// later. Set a flag to make sure that we don't count this key towards
// the set of unique keys we have verified.
readingRandomKey = true;
- return startKey + Math.abs(random.nextLong())
+ return startKey + Math.abs(RandomUtils.nextLong())
% (maxKeyToRead - startKey + 1);
}
@@ -239,7 +238,7 @@ public class MultiThreadedReader extends
if (verbose) {
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
- queryKey(get, random.nextInt(100) < verifyPercent);
+ queryKey(get, RandomUtils.nextInt(100) < verifyPercent);
} catch (IOException e) {
numReadFailures.addAndGet(1);
LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
@@ -279,7 +278,7 @@ public class MultiThreadedReader extends
numCols.addAndGet(cols);
} else {
if (writer != null) {
- LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys");
+ LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
}
numErrorsAfterThis = numReadErrors.incrementAndGet();
}
@@ -315,5 +314,4 @@ public class MultiThreadedReader extends
appendToStatus(sb, "READ ERRORS", numReadErrors.get());
return sb.toString();
}
-
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java?rev=1512817&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java Sat Aug 10 18:53:46 2013
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+
+/** Creates multiple threads that write key/values into the */
+public class MultiThreadedUpdater extends MultiThreadedWriterBase {
+ private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class);
+
+ private Set<HBaseUpdaterThread> updaters = new HashSet<HBaseUpdaterThread>();
+
+ private MultiThreadedWriterBase writer = null;
+ private boolean isBatchUpdate = false;
+ private final double updatePercent;
+
+ public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
+ TableName tableName, double updatePercent) {
+ super(dataGen, conf, tableName, "U");
+ this.updatePercent = updatePercent;
+ }
+
+ /** Use batch vs. separate updates for every column in a row */
+ public void setBatchUpdate(boolean isBatchUpdate) {
+ this.isBatchUpdate = isBatchUpdate;
+ }
+
+ public void linkToWriter(MultiThreadedWriterBase writer) {
+ this.writer = writer;
+ writer.setTrackWroteKeys(true);
+ }
+
+ @Override
+ public void start(long startKey, long endKey, int numThreads)
+ throws IOException {
+ super.start(startKey, endKey, numThreads);
+
+ if (verbose) {
+ LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
+ }
+
+ for (int i = 0; i < numThreads; ++i) {
+ HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
+ updaters.add(updater);
+ }
+
+ startThreads(updaters);
+ }
+
+ private long getNextKeyToUpdate() {
+ if (writer == null) {
+ return nextKeyToWrite.getAndIncrement();
+ }
+ synchronized (this) {
+ if (nextKeyToWrite.get() >= endKey) {
+ // Finished the whole key range
+ return endKey;
+ }
+ while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
+ Threads.sleepWithoutInterrupt(100);
+ }
+ long k = nextKeyToWrite.getAndIncrement();
+ if (writer.failedToWriteKey(k)) {
+ failedKeySet.add(k);
+ return getNextKeyToUpdate();
+ }
+ return k;
+ }
+ }
+
+ private class HBaseUpdaterThread extends Thread {
+ private final HTable table;
+
+ public HBaseUpdaterThread(int updaterId) throws IOException {
+ setName(getClass().getSimpleName() + "_" + updaterId);
+ table = new HTable(conf, tableName);
+ }
+
+ public void run() {
+ try {
+ long rowKeyBase;
+ StringBuilder buf = new StringBuilder();
+ byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+ while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
+ if (RandomUtils.nextInt(100) < updatePercent) {
+ byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
+ Increment inc = new Increment(rowKey);
+ Append app = new Append(rowKey);
+ numKeys.addAndGet(1);
+ int columnCount = 0;
+ for (byte[] cf : columnFamilies) {
+ long cfHash = Arrays.hashCode(cf);
+ inc.addColumn(cf, INCREMENT, cfHash);
+ buf.setLength(0); // Clear the buffer
+ buf.append("#").append(Bytes.toString(INCREMENT));
+ buf.append(":").append(MutationType.INCREMENT.getNumber());
+ app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
+ ++columnCount;
+ if (!isBatchUpdate) {
+ mutate(table, inc, rowKeyBase);
+ numCols.addAndGet(1);
+ inc = new Increment(rowKey);
+ mutate(table, app, rowKeyBase);
+ numCols.addAndGet(1);
+ app = new Append(rowKey);
+ }
+ Result result = null;
+ try {
+ Get get = new Get(rowKey);
+ get.addFamily(cf);
+ result = table.get(get);
+ } catch (IOException ie) {
+ LOG.warn("Failed to get the row for key = ["
+ + rowKey + "], column family = [" + Bytes.toString(cf) + "]", ie);
+ }
+ Map<byte[], byte[]> columnValues =
+ result != null ? result.getFamilyMap(cf) : null;
+ if (columnValues == null) {
+ failedKeySet.add(rowKeyBase);
+ LOG.error("Failed to update the row with key = ["
+ + rowKey + "], since we could not get the original row");
+ }
+ for (byte[] column : columnValues.keySet()) {
+ if (Bytes.equals(column, INCREMENT)
+ || Bytes.equals(column, MUTATE_INFO)) {
+ continue;
+ }
+ MutationType mt = MutationType.valueOf(
+ RandomUtils.nextInt(MutationType.values().length));
+ long columnHash = Arrays.hashCode(column);
+ long hashCode = cfHash + columnHash;
+ byte[] hashCodeBytes = Bytes.toBytes(hashCode);
+ byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
+ if (hashCode % 2 == 0) {
+ KeyValue kv = result.getColumnLatest(cf, column);
+ checkedValue = kv != null ? kv.getValue() : null;
+ Preconditions.checkNotNull(checkedValue,
+ "Column value to be checked should not be null");
+ }
+ buf.setLength(0); // Clear the buffer
+ buf.append("#").append(Bytes.toString(column)).append(":");
+ ++columnCount;
+ switch (mt) {
+ case PUT:
+ Put put = new Put(rowKey);
+ put.add(cf, column, hashCodeBytes);
+ mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
+ buf.append(MutationType.PUT.getNumber());
+ break;
+ case DELETE:
+ Delete delete = new Delete(rowKey);
+ // Delete all versions since a put
+ // could be called multiple times if CM is used
+ delete.deleteColumns(cf, column);
+ mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
+ buf.append(MutationType.DELETE.getNumber());
+ break;
+ default:
+ buf.append(MutationType.APPEND.getNumber());
+ app.add(cf, column, hashCodeBytes);
+ }
+ app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
+ if (!isBatchUpdate) {
+ mutate(table, app, rowKeyBase);
+ numCols.addAndGet(1);
+ app = new Append(rowKey);
+ }
+ }
+ }
+ if (isBatchUpdate) {
+ if (verbose) {
+ LOG.debug("Preparing increment and append for key = ["
+ + rowKey + "], " + columnCount + " columns");
+ }
+ mutate(table, inc, rowKeyBase);
+ mutate(table, app, rowKeyBase);
+ numCols.addAndGet(columnCount);
+ }
+ }
+ if (trackWroteKeys) {
+ wroteKeys.add(rowKeyBase);
+ }
+ }
+ } finally {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Error closing table", e);
+ }
+ numThreadsWorking.decrementAndGet();
+ }
+ }
+ }
+
+ @Override
+ public void waitForFinish() {
+ super.waitForFinish();
+ System.out.println("Failed to update keys: " + failedKeySet.size());
+ for (Long key : failedKeySet) {
+ System.out.println("Failed to update key: " + key);
+ }
+ }
+
+ public void mutate(HTable table, Mutation m, long keyBase) {
+ mutate(table, m, keyBase, null, null, null, null);
+ }
+
+ public void mutate(HTable table, Mutation m,
+ long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
+ long start = System.currentTimeMillis();
+ try {
+ if (m instanceof Increment) {
+ table.increment((Increment)m);
+ } else if (m instanceof Append) {
+ table.append((Append)m);
+ } else if (m instanceof Put) {
+ table.checkAndPut(row, cf, q, v, (Put)m);
+ } else if (m instanceof Delete) {
+ table.checkAndDelete(row, cf, q, v, (Delete)m);
+ } else {
+ throw new IllegalArgumentException(
+ "unsupported mutation " + m.getClass().getSimpleName());
+ }
+ totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+ } catch (IOException e) {
+ failedKeySet.add(keyBase);
+ String exceptionInfo;
+ if (e instanceof RetriesExhaustedWithDetailsException) {
+ RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
+ exceptionInfo = aggEx.getExhaustiveDescription();
+ } else {
+ StringWriter stackWriter = new StringWriter();
+ PrintWriter pw = new PrintWriter(stackWriter);
+ e.printStackTrace(pw);
+ pw.flush();
+ exceptionInfo = StringUtils.stringifyException(e);
+ }
+ LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
+ "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ + exceptionInfo);
+ }
+ }
+}
Propchange: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Sat Aug 10 18:53:46 2013
@@ -18,72 +18,33 @@
package org.apache.hadoop.hbase.util;
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
+
import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import java.util.Arrays;
import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/** Creates multiple threads that write key/values into the */
-public class MultiThreadedWriter extends MultiThreadedAction {
+public class MultiThreadedWriter extends MultiThreadedWriterBase {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
private boolean isMultiPut = false;
- /**
- * A temporary place to keep track of inserted keys. This is written to by
- * all writers and is drained on a separate thread that populates
- * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
- * being inserted. This queue is supposed to stay small.
- */
- private BlockingQueue<Long> insertedKeys = new ArrayBlockingQueue<Long>(10000);
-
- /**
- * This is the current key to be inserted by any thread. Each thread does an
- * atomic get and increment operation and inserts the current value.
- */
- private AtomicLong nextKeyToInsert = new AtomicLong();
-
- /**
- * The highest key in the contiguous range of keys .
- */
- private AtomicLong insertedUpToKey = new AtomicLong();
-
- /** The sorted set of keys NOT inserted by the writers */
- private Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
-
- /**
- * The total size of the temporary inserted key set that have not yet lined
- * up in a our contiguous sequence starting from startKey. Supposed to stay
- * small.
- */
- private AtomicLong insertedKeyQueueSize = new AtomicLong();
-
- /** Enable this if used in conjunction with a concurrent reader. */
- private boolean trackInsertedKeys;
-
public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
- TableName tableName) {
+ TableName tableName) {
super(dataGen, conf, tableName, "W");
}
@@ -101,19 +62,11 @@ public class MultiThreadedWriter extends
LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
}
- nextKeyToInsert.set(startKey);
- insertedUpToKey.set(startKey - 1);
-
for (int i = 0; i < numThreads; ++i) {
HBaseWriterThread writer = new HBaseWriterThread(i);
writers.add(writer);
}
- if (trackInsertedKeys) {
- new Thread(new InsertedKeysTracker()).start();
- numThreadsWorking.incrementAndGet();
- }
-
startThreads(writers);
}
@@ -129,13 +82,12 @@ public class MultiThreadedWriter extends
try {
long rowKeyBase;
byte[][] columnFamilies = dataGenerator.getColumnFamilies();
- while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) {
+ while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
Put put = new Put(rowKey);
numKeys.addAndGet(1);
int columnCount = 0;
for (byte[] cf : columnFamilies) {
- String s;
byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
for (byte[] column : columns) {
byte[] value = dataGenerator.generateValue(rowKey, cf, column);
@@ -147,6 +99,14 @@ public class MultiThreadedWriter extends
put = new Put(rowKey);
}
}
+ long rowKeyHash = Arrays.hashCode(rowKey);
+ put.add(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
+ put.add(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
+ if (!isMultiPut) {
+ insert(table, put, rowKeyBase);
+ numCols.addAndGet(1);
+ put = new Put(rowKey);
+ }
}
if (isMultiPut) {
if (verbose) {
@@ -155,8 +115,8 @@ public class MultiThreadedWriter extends
insert(table, put, rowKeyBase);
numCols.addAndGet(columnCount);
}
- if (trackInsertedKeys) {
- insertedKeys.add(rowKeyBase);
+ if (trackWroteKeys) {
+ wroteKeys.add(rowKeyBase);
}
}
} finally {
@@ -170,104 +130,6 @@ public class MultiThreadedWriter extends
}
}
- public void insert(HTable table, Put put, long keyBase) {
- long start = System.currentTimeMillis();
- try {
- table.put(put);
- totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
- } catch (IOException e) {
- failedKeySet.add(keyBase);
- String exceptionInfo;
- if (e instanceof RetriesExhaustedWithDetailsException) {
- RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
- exceptionInfo = aggEx.getExhaustiveDescription();
- } else {
- StringWriter stackWriter = new StringWriter();
- PrintWriter pw = new PrintWriter(stackWriter);
- e.printStackTrace(pw);
- pw.flush();
- exceptionInfo = StringUtils.stringifyException(e);
- }
- LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) +
- "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
- + exceptionInfo);
- }
- }
-
- private String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
- HRegionLocation cached = null, real = null;
- try {
- cached = table.getRegionLocation(rowKey, false);
- real = table.getRegionLocation(rowKey, true);
- } catch (Throwable t) {
- // Cannot obtain region information for another catch block - too bad!
- }
- String result = "no information can be obtained";
- if (cached != null) {
- result = "cached: " + cached.toString();
- }
- if (real != null) {
- if (real.equals(cached)) {
- result += "; cache is up to date";
- } else {
- result = (cached != null) ? (result + "; ") : "";
- result += "real: " + real.toString();
- }
- }
- return result;
- }
-
- /**
- * A thread that keeps track of the highest key in the contiguous range of
- * inserted keys.
- */
- private class InsertedKeysTracker implements Runnable {
-
- @Override
- public void run() {
- Thread.currentThread().setName(getClass().getSimpleName());
- try {
- long expectedKey = startKey;
- Queue<Long> sortedKeys = new PriorityQueue<Long>();
- while (expectedKey < endKey) {
- // Block until a new element is available.
- Long k;
- try {
- k = insertedKeys.poll(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOG.info("Inserted key tracker thread interrupted", e);
- break;
- }
- if (k == null) {
- continue;
- }
- if (k == expectedKey) {
- // Skip the "sorted key" queue and consume this key.
- insertedUpToKey.set(k);
- ++expectedKey;
- } else {
- sortedKeys.add(k);
- }
-
- // See if we have a sequence of contiguous keys lined up.
- while (!sortedKeys.isEmpty()
- && ((k = sortedKeys.peek()) == expectedKey)) {
- sortedKeys.poll();
- insertedUpToKey.set(k);
- ++expectedKey;
- }
-
- insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size());
- }
- } catch (Exception ex) {
- LOG.error("Error in inserted key tracker", ex);
- } finally {
- numThreadsWorking.decrementAndGet();
- }
- }
-
- }
-
@Override
public void waitForFinish() {
super.waitForFinish();
@@ -276,37 +138,4 @@ public class MultiThreadedWriter extends
System.out.println("Failed to write key: " + key);
}
}
-
- public int getNumWriteFailures() {
- return failedKeySet.size();
- }
-
- /**
- * The max key until which all keys have been inserted (successfully or not).
- * @return the last key that we have inserted all keys up to (inclusive)
- */
- public long insertedUpToKey() {
- return insertedUpToKey.get();
- }
-
- public boolean failedToWriteKey(long k) {
- return failedKeySet.contains(k);
- }
-
- @Override
- protected String progressInfo() {
- StringBuilder sb = new StringBuilder();
- appendToStatus(sb, "insertedUpTo", insertedUpToKey.get());
- appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get());
- return sb.toString();
- }
-
- /**
- * Used for a joint write/read workload. Enables tracking the last inserted
- * key, which requires a blocking queue and a consumer thread.
- * @param enable whether to enable tracking the last inserted key
- */
- public void setTrackInsertedKeys(boolean enable) {
- trackInsertedKeys = enable;
- }
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java?rev=1512817&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java Sat Aug 10 18:53:46 2013
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.StringUtils;
+
+/** Creates multiple threads that write key/values into the */
+public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
+ private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class);
+
+ /**
+ * A temporary place to keep track of inserted/updated keys. This is written to by
+ * all writers and is drained on a separate thread that populates
+ * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
+ * being inserted/updated. This queue is supposed to stay small.
+ */
+ protected BlockingQueue<Long> wroteKeys = new ArrayBlockingQueue<Long>(10000);
+
+ /**
+ * This is the current key to be inserted/updated by any thread. Each thread does an
+ * atomic get and increment operation and inserts the current value.
+ */
+ protected AtomicLong nextKeyToWrite = new AtomicLong();
+
+ /**
+ * The highest key in the contiguous range of keys .
+ */
+ protected AtomicLong wroteUpToKey = new AtomicLong();
+
+ /** The sorted set of keys NOT inserted/updated by the writers */
+ protected Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
+
+ /**
+ * The total size of the temporary inserted/updated key set that have not yet lined
+ * up in a our contiguous sequence starting from startKey. Supposed to stay
+ * small.
+ */
+ protected AtomicLong wroteKeyQueueSize = new AtomicLong();
+
+ /** Enable this if used in conjunction with a concurrent reader. */
+ protected boolean trackWroteKeys;
+
+ public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
+ TableName tableName, String actionLetter) {
+ super(dataGen, conf, tableName, actionLetter);
+ }
+
+ @Override
+ public void start(long startKey, long endKey, int numThreads)
+ throws IOException {
+ super.start(startKey, endKey, numThreads);
+
+ nextKeyToWrite.set(startKey);
+ wroteUpToKey.set(startKey - 1);
+
+ if (trackWroteKeys) {
+ new Thread(new WroteKeysTracker()).start();
+ numThreadsWorking.incrementAndGet();
+ }
+ }
+
+ protected String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
+ HRegionLocation cached = null, real = null;
+ try {
+ cached = table.getRegionLocation(rowKey, false);
+ real = table.getRegionLocation(rowKey, true);
+ } catch (Throwable t) {
+ // Cannot obtain region information for another catch block - too bad!
+ }
+ String result = "no information can be obtained";
+ if (cached != null) {
+ result = "cached: " + cached.toString();
+ }
+ if (real != null) {
+ if (real.equals(cached)) {
+ result += "; cache is up to date";
+ } else {
+ result = (cached != null) ? (result + "; ") : "";
+ result += "real: " + real.toString();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * A thread that keeps track of the highest key in the contiguous range of
+ * inserted/updated keys.
+ */
+ private class WroteKeysTracker implements Runnable {
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(getClass().getSimpleName());
+ try {
+ long expectedKey = startKey;
+ Queue<Long> sortedKeys = new PriorityQueue<Long>();
+ while (expectedKey < endKey) {
+ // Block until a new element is available.
+ Long k;
+ try {
+ k = wroteKeys.poll(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.info("Inserted key tracker thread interrupted", e);
+ break;
+ }
+ if (k == null) {
+ continue;
+ }
+ if (k == expectedKey) {
+ // Skip the "sorted key" queue and consume this key.
+ wroteUpToKey.set(k);
+ ++expectedKey;
+ } else {
+ sortedKeys.add(k);
+ }
+
+ // See if we have a sequence of contiguous keys lined up.
+ while (!sortedKeys.isEmpty()
+ && ((k = sortedKeys.peek()) == expectedKey)) {
+ sortedKeys.poll();
+ wroteUpToKey.set(k);
+ ++expectedKey;
+ }
+
+ wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
+ }
+ } catch (Exception ex) {
+ LOG.error("Error in inserted/updaed key tracker", ex);
+ } finally {
+ numThreadsWorking.decrementAndGet();
+ }
+ }
+ }
+
+ public int getNumWriteFailures() {
+ return failedKeySet.size();
+ }
+
+ public void insert(HTable table, Put put, long keyBase) {
+ long start = System.currentTimeMillis();
+ try {
+ table.put(put);
+ totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+ } catch (IOException e) {
+ failedKeySet.add(keyBase);
+ String exceptionInfo;
+ if (e instanceof RetriesExhaustedWithDetailsException) {
+ RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
+ exceptionInfo = aggEx.getExhaustiveDescription();
+ } else {
+ StringWriter stackWriter = new StringWriter();
+ PrintWriter pw = new PrintWriter(stackWriter);
+ e.printStackTrace(pw);
+ pw.flush();
+ exceptionInfo = StringUtils.stringifyException(e);
+ }
+ LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) +
+ "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
+ + exceptionInfo);
+ }
+ }
+
+ /**
+ * The max key until which all keys have been inserted/updated (successfully or not).
+ * @return the last key that we have inserted/updated all keys up to (inclusive)
+ */
+ public long wroteUpToKey() {
+ return wroteUpToKey.get();
+ }
+
+ public boolean failedToWriteKey(long k) {
+ return failedKeySet.contains(k);
+ }
+
+ @Override
+ protected String progressInfo() {
+ StringBuilder sb = new StringBuilder();
+ appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
+ appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
+ return sb.toString();
+ }
+
+ /**
+ * Used for a joint write/read workload. Enables tracking the last inserted/updated
+ * key, which requires a blocking queue and a consumer thread.
+ * @param enable whether to enable tracking the last inserted/updated key
+ */
+ public void setTrackWroteKeys(boolean enable) {
+ trackWroteKeys = enable;
+ }
+}
Propchange: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
------------------------------------------------------------------------------
svn:eol-style = native