You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/06 20:18:13 UTC
svn commit: r1465284 - in
/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks:
Benchmark.java GetBenchmark.java RegionGetBenchmark.java
Author: liyin
Date: Sat Apr 6 18:18:13 2013
New Revision: 1465284
URL: http://svn.apache.org/r1465284
Log:
[master] Changes to verify the get of the benchmark returns a valid result.
Author: shaneh
Summary:
Coordinates load generating threads to get accurate benchmarks.
Confirms results returned are not null, are for the key requested,
and are ofthe correct length.
Test Plan:
Ran the benchmark and confirmed the values were created
correctly.
Reviewers: liyintang, kranganathan, adela
Reviewed By: adela
CC: adela
Differential Revision: https://phabricator.fb.com/D735455
Modified:
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java?rev=1465284&r1=1465283&r2=1465284&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java Sat Apr 6 18:18:13 2013
@@ -1,6 +1,7 @@
package org.apache.hadoop.hbase.benchmarks;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Random;
@@ -17,24 +18,25 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.loadtest.RegionSplitter;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -141,7 +143,7 @@ public abstract class Benchmark {
* @param numKVs number of kv's to write into the table and cf
* @param numRegionsPerRS numer of regions per RS, 0 for one region
* @param bulkLoad if true, create HFile and load. Else do puts.
- * @param keys if not null, fills in the keys populated for bulk load case.
+ * @param keysWritten if not null, fills in the keys populated for bulk load case.
* @return HTable instance to the table just created
* @throws IOException
*/
@@ -149,13 +151,14 @@ public abstract class Benchmark {
int kvSize, long numKVs, int numRegionsPerRS, boolean bulkLoad,
List<byte[]> keysWritten) throws IOException {
HTable htable = null;
+
try {
htable = new HTable(conf, tableName);
LOG.info("Table " + new String(tableName) + " exists, skipping create.");
} catch (IOException e) {
LOG.info("Table " + new String(tableName) + " does not exist.");
}
-
+
int numRegions = numRegionsPerRS;
if (htable == null) {
// create the table - 1 version, no compression or bloomfilters
@@ -226,9 +229,11 @@ public abstract class Benchmark {
continue;
}
long startKey = 0;
- try {
- startKey = getLongFromRowKey(hRegionInfo.getStartKey());
- } catch (NumberFormatException e) { }
+ byte[] startKeyBytes = hRegionInfo.getStartKey();
+ if (startKeyBytes.length != 0) {
+ startKey = Bytes.toLong(startKeyBytes);
+ }
+
long rowID = startKey;
for (; rowID < startKey + numKVsInRegion; rowID++) {
byte[] row = getRowKeyFromLong(rowID);
@@ -265,7 +270,7 @@ public abstract class Benchmark {
Path hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
Path basedir = new Path(hbaseRootDir, new String(tableName));
bulkLoadDataForRegion(fs, basedir, hRegionInfo, regionServer,
- cfNameStr, kvSize, numKVsInRegion, keysWritten);
+ cfNameStr, kvSize, numKVsInRegion, tableName, keysWritten, conf);
}
}
else {
@@ -293,54 +298,85 @@ public abstract class Benchmark {
* Create a HFile and bulk load it for a given region
* @throws IOException
*/
- public void bulkLoadDataForRegion(FileSystem fs, Path basedir,
- HRegionInfo hRegionInfo, HRegionInterface regionServer, String cfNameStr,
- int kvSize, long numKVsInRegion, List<byte[]> keysWritten)
+ public void bulkLoadDataForRegion(FileSystem fs, Path basedir,
+ HRegionInfo hRegionInfo, HRegionInterface regionServer, String cfNameStr,
+ int kvSize, long numKVsInRegion, byte[] tableName, List<byte[]> keysWritten,
+ Configuration conf)
throws IOException {
- // create an hfile
- Path hFile = new Path(basedir, "hfile." + hRegionInfo.getEncodedName() +
- "." + System.currentTimeMillis());
- HFile.Writer writer =
- HFile.getWriterFactoryNoCache(conf).withPath(fs, hFile).create();
+
+ HTable hTable = new HTable(conf, tableName);
+
+ HFileOutputFormat hfof = new HFileOutputFormat();
+ RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
+
+ String outputDirStr = conf.get("mapred.output.dir");
+
+ if (outputDirStr == null || outputDirStr.equals("")) {
+ conf.setStrings("mapred.output.dir", basedir.toString() + "/mapredOutput");
+ outputDirStr = basedir.toString() + "/mapredOutput";
+ }
+
+ Path outputDir = new Path(outputDirStr);
+
+ TaskAttemptContext cntx = new TaskAttemptContext(conf, new TaskAttemptID());
+
+ try {
+ writer = hfof.getRecordWriter(cntx);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
byte [] family =
hRegionInfo.getTableDesc().getFamilies().iterator().next().getName();
byte [] value = new byte[kvSize];
(new Random()).nextBytes(value);
long startKey = 0;
- try {
- startKey = getLongFromRowKey(hRegionInfo.getStartKey());
- } catch (NumberFormatException e) { }
+
+ LOG.info(Bytes.toStringBinary(hRegionInfo.getRegionName()) +
+ " Start Key: " + Bytes.toStringBinary(hRegionInfo.getStartKey()) +
+ " End Key: " + Bytes.toStringBinary(hRegionInfo.getEndKey()));
+
+ byte[] startKeyBytes = hRegionInfo.getStartKey();
+ if (startKeyBytes.length != 0) {
+ startKey = getLongFromRowKey(startKeyBytes);
+ }
+
long rowID = startKey;
for (; rowID < startKey + numKVsInRegion; rowID++) {
byte[] row = getRowKeyFromLong(rowID);
- writer.append(new KeyValue(row, family, Bytes.toBytes(rowID),
- System.currentTimeMillis(), value));
- if (keysWritten != null) keysWritten.add(row);
- }
- writer.close();
- LOG.info("Done creating data file: " + hFile.getName() +
- ", hfile key-range: (" + startKey + ", " + rowID +
- ") for region: " + hRegionInfo.getEncodedName() +
- ", region key-range: (" +
- (new String(hRegionInfo.getStartKey())).trim() + ", " +
- (new String(hRegionInfo.getEndKey())).trim() + ")");
+ KeyValue keyValue = new KeyValue(row, family, row,
+ System.currentTimeMillis(), value);
- // bulk load the file
- if (regionServer != null) {
- regionServer.bulkLoadHFile(hFile.toString(), hRegionInfo.getRegionName(),
- Bytes.toBytes(cfNameStr), true);
- LOG.info("Done bulk-loading data file [" + hFile.getName() +
- "] for region [" + hRegionInfo.getEncodedName() + "]");
+ try {
+ writer.write(new ImmutableBytesWritable(), keyValue);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ if (keysWritten != null) keysWritten.add(keyValue.getRow());
+
+ }
+ try {
+ writer.close(cntx);
+ hfof.getOutputCommitter(cntx).commitTask(cntx);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
+
+ // bulk load the file
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ loader.doBulkLoad(outputDir, hTable);
+
+ LOG.info("Done bulk loading file.");
+
}
public static byte[] getRowKeyFromLong(long l) {
- return Bytes.toBytes(String.format("%20d", l));
+ return Bytes.toBytes(l);
}
public static long getLongFromRowKey(byte[] rowKey) {
- return Long.parseLong((new String(rowKey)).trim());
+ return Bytes.toLong(rowKey);
}
/**
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java?rev=1465284&r1=1465283&r2=1465284&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java Sat Apr 6 18:18:13 2013
@@ -2,42 +2,71 @@ package org.apache.hadoop.hbase.benchmar
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.HBaseClient;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.util.Bytes;
public class GetBenchmark extends Benchmark {
public static final Log LOG = LogFactory.getLog(GetBenchmark.class);
- static byte[] tableName = Bytes.toBytes("bench.GetFromMemory");
- static String cfName = "cf1";
- private static Integer[] CLIENT_THREADS =
- { 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200 };
- private static Integer[] NUM_CONNECTIONS = { 5, 6, 7, 8, 9, 10 };
- public static Configuration[] connCountConfs = new Configuration[100];
- public static final int numRegionsPerRS = 10;
- public static final int kvSize = 50;
- public static int numKVs = 1000000;
+ static String cfName;
+ private static Integer[] CLIENT_THREADS;
+ private static Integer[] NUM_CONNECTIONS;
+ public static Map<Integer, Configuration> connCountConfs = new HashMap<Integer, Configuration>();
+ public static int numRegionsPerRS;
+ public static int kvSize;
+ public static int numKVs;
static List<byte[]> keysWritten = new ArrayList<byte[]>();
+ static byte[] tableName;
/**
* Initialize the benchmark results tracking and output.
*/
public void initBenchmarkResults() {
+
+ Configuration conf = new Configuration();
+
+ kvSize = conf.getInt("benchmark.kvSize", 100);
+ tableName = Bytes.toBytes("bench.GetFromMemory." + String.format("%d", kvSize));
+ //This is produce 1,000,000 kvs for 100 byte keys. For larger size kvs we want fewer
+ //of them to keep the load time reasonable.
+ numKVs = 1000000 / (kvSize/100);
+ numRegionsPerRS = conf.getInt("benchmark.numRegionsPerRegionServer", 1);
+ cfName = conf.get("benchmark.cfName", "cf1");
+
+ String[] clientThreadsStr = conf.getStrings("benchmark.clientThreads", new String[]{"16", "32", "64", "128"});
+ CLIENT_THREADS = new Integer[clientThreadsStr.length];
+ int index = 0;
+ for (String str : clientThreadsStr) {
+ CLIENT_THREADS[index++] = new Integer(str);
+ }
+
+ String[] numConnectionsStr = conf.getStrings("benchmark.numConnections", new String[]{ "2", "4", "8", "16" });
+ NUM_CONNECTIONS = new Integer[numConnectionsStr.length];
+ index = 0;
+ for (String str : numConnectionsStr) {
+ NUM_CONNECTIONS[index++] = new Integer(str);
+ }
+
List<String> header = new ArrayList<String>();
header.add("Threads");
for (int i = 0; i < NUM_CONNECTIONS.length; i++) {
header.add(" conn=" + NUM_CONNECTIONS[i]);
}
benchmarkResults = new BenchmarkResults<Integer, Integer, Double>(
- CLIENT_THREADS, NUM_CONNECTIONS, " %5d", " %5.2f", header);
+ CLIENT_THREADS, NUM_CONNECTIONS, " %5d", " %,10.0f", header);
}
public void runBenchmark() throws Throwable {
@@ -47,7 +76,7 @@ public class GetBenchmark extends Benchm
System.out.println("Total kvs = " + keysWritten.size());
// warm block cache, force jit compilation
System.out.println("Warming blockcache and forcing JIT compilation...");
- runExperiment("warmup-", false, 1, 100*1000, 1);
+ runExperiment("warmup-", false, 1, 100000 * (kvSize/100), 1);
// iterate on the number of connections
for (int numConnections : NUM_CONNECTIONS) {
LOG.info("Num connection = " + numConnections);
@@ -58,9 +87,9 @@ public class GetBenchmark extends Benchm
if (numConnections > numThreads) continue;
try {
// read enough KVs to benchmark within a reasonable time
- long numKVsToRead = 40*1000;
- if (numThreads >= 5) numKVsToRead = 20*1000;
- if (numThreads >= 40) numKVsToRead = 10*1000;
+ long numKVsToRead = (1024*1024*1024)/(kvSize/4);
+ if (numThreads >= 5) numKVsToRead /= 2;
+ if (numThreads >= 40) numKVsToRead /= 2;
// run the experiment
runExperiment("t" + numThreads + "-", true,
numThreads, numKVsToRead, numConnections);
@@ -74,39 +103,100 @@ public class GetBenchmark extends Benchm
public void runExperiment(String prefix, boolean printStats,
int numThreads, long numReadsPerThread, int numConnections)
throws IOException {
- // Prepare the read threads
+
+ //Used in the run loop for each worker thread
+ AtomicBoolean running = new AtomicBoolean(true);
+ //Used to sync all worker threads on startup
+ CountDownLatch readySignal = new CountDownLatch(numThreads);
+ //Used to signal to the worker threads to start generating load
+ CountDownLatch startSignal = new CountDownLatch(1);
+ //Used by the worker threads to signal when each one has started
+ // generating load
+ CountDownLatch providingLoad = new CountDownLatch(numThreads);
+ //Used to tell the worker threads it is safe to start collecting
+ //statistics as all the other threads are currently generating load
+ AtomicBoolean startCollectingStats = new AtomicBoolean(false);
+ //Used to tell the worker threads to stop collecting load
+ AtomicBoolean stopCollectingStats = new AtomicBoolean(false);
+ //Used by the worker threads to signal they are done collecting
+ //statistics
+ CountDownLatch doneSignal = new CountDownLatch(numThreads);
+
+ // Prepare the worker threads
GetBenchMarkThread threads[] = new GetBenchMarkThread[numThreads];
for (int i = 0; i < numThreads; i++) {
- if (connCountConfs[numConnections] == null) {
- connCountConfs[numConnections] = getNewConfObject();
+ if (connCountConfs.get(numConnections) == null) {
+ connCountConfs.put(numConnections, getNewConfObject());
// set the number of connections per thread
- connCountConfs[numConnections].setInt(
+ connCountConfs.get(numConnections).setInt(
HBaseClient.NUM_CONNECTIONS_PER_SERVER, numConnections);
}
- Configuration conf = connCountConfs[numConnections];
+ Configuration conf = connCountConfs.get(numConnections);
threads[i] = new GetBenchMarkThread(prefix+i, tableName,
- Bytes.toBytes(cfName), 0, numKVs, numReadsPerThread, conf, true);
+ Bytes.toBytes(cfName), 0, numKVs, conf, true,
+ running, readySignal, startSignal, providingLoad,
+ startCollectingStats, stopCollectingStats, doneSignal);
}
// start the read threads, each one times itself
for (int i = 0; i < numThreads; i++) {
threads[i].start();
}
+
+ try {
+
+ //System.out.println("Waiting on threads to start up.");
+ //Signal Threads to start running test when all are ready
+ readySignal.await();
+
+ //System.out.println("Signaling threads to start generating load.");
+ //Start generating load
+ startSignal.countDown();
+
+ //System.out.println("Waiting for all threads to start generating load.");
+ //Wait for all thread to start Providing load
+ providingLoad.await();
+
+ //System.out.println("Signaling threads to start collecting stats.");
+ //Signal threads to collect stats
+ startCollectingStats.set(true);
+
+ //System.out.println("Waiting while threads collect stats.");
+ //Wait some time
+ Thread.sleep(60 * 1000);
+
+ //System.out.println("Signaling threads to stop collecting stats.");
+ //Signal threads to stop collecting stats
+ stopCollectingStats.set(true);
+
+ //System.out.println("Waiting for all threads to stop collecting stats.");
+ //Wait for all threads to be finished collecting stats
+ doneSignal.await();
+
+ //System.out.println("Signaling all threads to shutdown.");
+ //Stop all client threads
+ running.set(false);
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+
// wait for all the threads and compute the total ops/sec
double totalOpsPerSec = 0;
+ double totalBytesPerSec = 0;
+
int successThreads = 0;
for (int i = 0; i < numThreads; i++) {
- try {
- threads[i].join();
- totalOpsPerSec += threads[i].getOpsPerSecond();
- successThreads++;
- } catch (InterruptedException e) {
- LOG.error("Exception in thread " + i, e);
- }
+ //threads[i].join();
+ totalOpsPerSec += threads[i].getOpsPerSecond();
+ totalBytesPerSec += threads[i].getBytesPerSecond();
+ successThreads++;
}
System.out.println("Num threads = " + successThreads + ", " +
- "performance = " + String.format("%5.2f", totalOpsPerSec) + " ops/sec");
+ "performance = " + String.format("%6.2f", totalOpsPerSec) + " ops/sec " +
+ String.format("%,10.0f", totalBytesPerSec) + " bytes/sec");
// add to the benchmark results
- benchmarkResults.addResult(numThreads, numConnections, totalOpsPerSec);
+ benchmarkResults.addResult(numThreads, numConnections, totalBytesPerSec);
}
/**
@@ -120,63 +210,133 @@ public class GetBenchmark extends Benchm
byte[] cf;
int startKey;
int endKey;
- long numGetsToPerform;
+ // number of reads we have performed
+ long numSuccessfulGets = 0;
Configuration conf;
long timeTakenMillis = 0;
boolean debug = false;
Random random = new Random();
+
+ private boolean collectedStats = false;
+ private boolean communicatedLoad = false;
+
+ private AtomicBoolean running;
+ private AtomicBoolean startCollectingStats;
+ private AtomicBoolean stopCollectingStats;
+
+ private CountDownLatch readySignal;
+ private CountDownLatch startSignal;
+ private CountDownLatch providingLoadSignal;
+ private CountDownLatch doneSignal;
public GetBenchMarkThread(String name, byte[] table, byte[] cf,
- int startKey, int endKey, long numGetsToPerform, Configuration conf,
- boolean debug) {
+ int startKey, int endKey, Configuration conf, boolean debug,
+ AtomicBoolean running, CountDownLatch readySignal, CountDownLatch startSignal,
+ CountDownLatch providingLoadSignal, AtomicBoolean startCollectingStats,
+ AtomicBoolean stopCollectingStats, CountDownLatch doneSignal) {
this.name = name;
this.table = table;
this.cf = cf;
this.startKey = startKey;
this.endKey = endKey;
- this.numGetsToPerform = numGetsToPerform;
this.conf = conf;
this.debug = debug;
+ this.running = running;
+ this.readySignal = readySignal;
+ this.startSignal = startSignal;
+ this.providingLoadSignal = providingLoadSignal;
+ this.startCollectingStats = startCollectingStats;
+ this.stopCollectingStats = stopCollectingStats;
+ this.doneSignal = doneSignal;
}
-
+
+ /**
+ * Returns the number of bytes/second at the current moment.
+ */
+ public double getBytesPerSecond() {
+ return kvSize*getOpsPerSecond();
+ }
+
/**
* Returns the number of ops/second at the current moment.
*/
public double getOpsPerSecond() {
- return (numGetsToPerform * 1.0 * 1000 / timeTakenMillis);
+ return (numSuccessfulGets / (timeTakenMillis/1000.0));
}
public void run() {
try {
+ //Signal this thread is ready
+ readySignal.countDown();
+ //Wait for start signal
+ startSignal.await();
// create a new HTable instance
HTable htable = new HTable(conf, tableName);
byte[] rowKey = null;
- // number of reads we have performed
- long numSuccessfulGets = 0;
- while (numSuccessfulGets < numGetsToPerform) {
+ Result result;
+ //Run until stopped
+ while (running.get()) {
+
+ //If this worker has collected stats in the past
+ //and is now being told not to do so signal
+ //that this thread is done.
+ if (collectedStats &&
+ stopCollectingStats.get()) {
+ //Signal This thread is done collecting stats
+ doneSignal.countDown();
+ }
+
// create a random key in the range passed in
rowKey = keysWritten.get(random.nextInt(keysWritten.size()));
- // keys are assumed to be 20 chars
+
Get get = new Get(rowKey);
get.addFamily(cf);
+
// time the actual get
long t1 = System.currentTimeMillis();
- htable.get(get);
- timeTakenMillis += System.currentTimeMillis() - t1;
- numSuccessfulGets++;
- // print progress if needed
- if (debug && numSuccessfulGets % PRINT_INTERVAL == 0) {
- double opsPerSec =
- (numSuccessfulGets * 1.0 * 1000 / timeTakenMillis);
- LOG.debug("[Thread-" + name + "] " + "" +
- "Num gets = " + numSuccessfulGets + "/" + numGetsToPerform +
- ", current rate = " + String.format("%.2f", opsPerSec) +
- " ops/sec");
+ result = htable.get(get);
+ long delta = System.currentTimeMillis() - t1;
+
+ //Let the coordinating thread know we are providing load.
+ //If we have not been counted as generating load before
+ //count us now.
+ if (!communicatedLoad) {
+ providingLoadSignal.countDown();
+ communicatedLoad = true;
+ }
+
+ //Test to make sure the result we got back is what we asked for
+ if (result == null) {
+ LOG.warn("Null result returned from HBase!!!");
+ } else {
+ byte [] resultRowKey = result.getRow();
+
+ if(result.isEmpty()) {
+ LOG.info("NO RESULTS RETURNED!!!");
+ }
+
+ if (Bytes.equals(rowKey, resultRowKey)) {
+ if (!stopCollectingStats.get() &&
+ startCollectingStats.get()) {
+ collectedStats = true;
+ timeTakenMillis += delta;
+ numSuccessfulGets++;
+ }
+
+ } else {
+ LOG.warn("Row Keys didn't match!!! Get RowKey: " + Bytes.toStringBinary(get.getRow()) +
+ " result: " + Bytes.toStringBinary(result.getRow()) +
+ " rowKey: " + Bytes.toStringBinary(rowKey));
+ }
}
}
+
} catch (IOException e) {
LOG.error("IOException while running read thread, will exit", e);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
+
}
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java?rev=1465284&r1=1465283&r2=1465284&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java Sat Apr 6 18:18:13 2013
@@ -85,7 +85,7 @@ public class RegionGetBenchmark extends
HRegionInfo hRegionInfo =
new HRegionInfo(htd, getRowKeyFromLong(0), null, false);
bulkLoadDataForRegion(fs, basedir, hRegionInfo, null, cfName, kvSize,
- numKVs, keysWritten);
+ numKVs, Bytes.toBytes(tableName), keysWritten, conf);
// setup all the regions
int maxRegions = NUM_REGIONS[0];