You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/06 20:18:13 UTC

svn commit: r1465284 - in /hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks: Benchmark.java GetBenchmark.java RegionGetBenchmark.java

Author: liyin
Date: Sat Apr  6 18:18:13 2013
New Revision: 1465284

URL: http://svn.apache.org/r1465284
Log:
[master] Changes to verify the get of the benchmark returns a valid result.

Author: shaneh

Summary:
Coordinates load generating threads to get accurate benchmarks.
Confirms results returned are not null, are for the key requested,
and are ofthe correct length.

Test Plan:
Ran the benchmark and confirmed the values were created
correctly.

Reviewers: liyintang, kranganathan, adela

Reviewed By: adela

CC: adela

Differential Revision: https://phabricator.fb.com/D735455

Modified:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java?rev=1465284&r1=1465283&r2=1465284&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java Sat Apr  6 18:18:13 2013
@@ -1,6 +1,7 @@
 package org.apache.hadoop.hbase.benchmarks;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.Random;
@@ -17,24 +18,25 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.loadtest.RegionSplitter;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -141,7 +143,7 @@ public abstract class Benchmark {
    * @param numKVs number of kv's to write into the table and cf
    * @param numRegionsPerRS numer of regions per RS, 0 for one region
    * @param bulkLoad if true, create HFile and load. Else do puts.
-   * @param keys if not null, fills in the keys populated for bulk load case.
+   * @param keysWritten if not null, fills in the keys populated for bulk load case.
    * @return HTable instance to the table just created
    * @throws IOException
    */
@@ -149,13 +151,14 @@ public abstract class Benchmark {
       int kvSize, long numKVs, int numRegionsPerRS, boolean bulkLoad, 
       List<byte[]> keysWritten) throws IOException {
     HTable htable = null;
+
     try {
       htable = new HTable(conf, tableName);
       LOG.info("Table " + new String(tableName) + " exists, skipping create.");
     } catch (IOException e) {
       LOG.info("Table " + new String(tableName) + " does not exist.");
     }
-    
+
     int numRegions = numRegionsPerRS;
     if (htable == null) {
       // create the table - 1 version, no compression or bloomfilters
@@ -226,9 +229,11 @@ public abstract class Benchmark {
             continue;
           }
           long startKey = 0;
-          try {
-            startKey = getLongFromRowKey(hRegionInfo.getStartKey());
-          } catch (NumberFormatException e) { }
+          byte[] startKeyBytes = hRegionInfo.getStartKey();
+          if (startKeyBytes.length != 0) {
+            startKey = Bytes.toLong(startKeyBytes);
+          }
+
           long rowID = startKey;
           for (; rowID < startKey + numKVsInRegion; rowID++) {
             byte[] row = getRowKeyFromLong(rowID);
@@ -265,7 +270,7 @@ public abstract class Benchmark {
         Path hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
         Path basedir = new Path(hbaseRootDir, new String(tableName));
         bulkLoadDataForRegion(fs, basedir, hRegionInfo, regionServer, 
-            cfNameStr, kvSize, numKVsInRegion, keysWritten);
+            cfNameStr, kvSize, numKVsInRegion, tableName, keysWritten, conf);
       }
     } 
     else {
@@ -293,54 +298,85 @@ public abstract class Benchmark {
    * Create a HFile and bulk load it for a given region
    * @throws IOException 
    */
-  public void bulkLoadDataForRegion(FileSystem fs, Path basedir, 
-      HRegionInfo hRegionInfo, HRegionInterface regionServer, String cfNameStr, 
-      int kvSize, long numKVsInRegion, List<byte[]> keysWritten) 
+  public void bulkLoadDataForRegion(FileSystem fs, Path basedir,
+                                    HRegionInfo hRegionInfo, HRegionInterface regionServer, String cfNameStr,
+                                    int kvSize, long numKVsInRegion, byte[] tableName, List<byte[]> keysWritten,
+                                    Configuration conf)
   throws IOException {
-    // create an hfile
-    Path hFile =  new Path(basedir, "hfile." + hRegionInfo.getEncodedName() + 
-        "." + System.currentTimeMillis());
-    HFile.Writer writer =
-      HFile.getWriterFactoryNoCache(conf).withPath(fs, hFile).create();
+
+    HTable hTable = new HTable(conf, tableName);
+
+    HFileOutputFormat hfof = new HFileOutputFormat();
+    RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
+
+    String outputDirStr = conf.get("mapred.output.dir");
+
+    if (outputDirStr == null || outputDirStr.equals("")) {
+      conf.setStrings("mapred.output.dir", basedir.toString() + "/mapredOutput");
+      outputDirStr = basedir.toString() + "/mapredOutput";
+    }
+
+    Path outputDir = new Path(outputDirStr);
+
+    TaskAttemptContext cntx = new TaskAttemptContext(conf, new TaskAttemptID());
+
+    try {
+      writer = hfof.getRecordWriter(cntx);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
     byte [] family = 
       hRegionInfo.getTableDesc().getFamilies().iterator().next().getName();
     byte [] value = new byte[kvSize];
     (new Random()).nextBytes(value);
 
     long startKey = 0;
-    try {
-      startKey = getLongFromRowKey(hRegionInfo.getStartKey());
-    } catch (NumberFormatException e) { }
+
+    LOG.info(Bytes.toStringBinary(hRegionInfo.getRegionName()) +
+        " Start Key: " + Bytes.toStringBinary(hRegionInfo.getStartKey()) +
+        " End Key: " + Bytes.toStringBinary(hRegionInfo.getEndKey()));
+
+    byte[] startKeyBytes = hRegionInfo.getStartKey();
+    if (startKeyBytes.length != 0) {
+      startKey = getLongFromRowKey(startKeyBytes);
+    }
+
     long rowID = startKey;
     for (; rowID < startKey + numKVsInRegion; rowID++) {
       byte[] row = getRowKeyFromLong(rowID);
-      writer.append(new KeyValue(row, family, Bytes.toBytes(rowID), 
-          System.currentTimeMillis(), value));
-      if (keysWritten != null) keysWritten.add(row);
-    }
-    writer.close();
-    LOG.info("Done creating data file: " + hFile.getName() + 
-        ", hfile key-range: (" + startKey + ", " + rowID + 
-        ") for region: " + hRegionInfo.getEncodedName() + 
-        ", region key-range: (" + 
-        (new String(hRegionInfo.getStartKey())).trim() + ", " + 
-        (new String(hRegionInfo.getEndKey())).trim() + ")");
+      KeyValue keyValue = new KeyValue(row, family, row,
+          System.currentTimeMillis(), value);
 
-    // bulk load the file
-    if (regionServer != null) {
-      regionServer.bulkLoadHFile(hFile.toString(), hRegionInfo.getRegionName(), 
-          Bytes.toBytes(cfNameStr), true);
-      LOG.info("Done bulk-loading data file [" + hFile.getName() + 
-          "] for region [" + hRegionInfo.getEncodedName() + "]");
+      try {
+        writer.write(new ImmutableBytesWritable(), keyValue);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      if (keysWritten != null) keysWritten.add(keyValue.getRow());
+
+    }
+    try {
+      writer.close(cntx);
+      hfof.getOutputCommitter(cntx).commitTask(cntx);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
+
+    // bulk load the file
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    loader.doBulkLoad(outputDir, hTable);
+
+    LOG.info("Done bulk loading file.");
+
   }
   
   public static byte[] getRowKeyFromLong(long l) {
-    return Bytes.toBytes(String.format("%20d", l));
+    return Bytes.toBytes(l);
   }
   
   public static long getLongFromRowKey(byte[] rowKey) {
-    return Long.parseLong((new String(rowKey)).trim());
+    return Bytes.toLong(rowKey);
   }
   
   /**

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java?rev=1465284&r1=1465283&r2=1465284&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java Sat Apr  6 18:18:13 2013
@@ -2,42 +2,71 @@ package org.apache.hadoop.hbase.benchmar
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.ipc.HBaseClient;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.util.Bytes;
 
 public class GetBenchmark extends Benchmark {
   public static final Log LOG = LogFactory.getLog(GetBenchmark.class);
-  static byte[] tableName = Bytes.toBytes("bench.GetFromMemory");
-  static String cfName = "cf1";
-  private static Integer[] CLIENT_THREADS = 
-    { 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200 };
-  private static Integer[] NUM_CONNECTIONS = { 5, 6, 7, 8, 9, 10 };
-  public static Configuration[] connCountConfs = new Configuration[100];
-  public static final int numRegionsPerRS = 10;
-  public static final int kvSize = 50;
-  public static int numKVs = 1000000;
+  static String cfName;
+  private static Integer[] CLIENT_THREADS;
+  private static Integer[] NUM_CONNECTIONS;
+  public static Map<Integer, Configuration> connCountConfs = new HashMap<Integer, Configuration>();
+  public static int numRegionsPerRS;
+  public static int kvSize;
+  public static int numKVs;
   static List<byte[]> keysWritten = new ArrayList<byte[]>();
+  static byte[] tableName;
   
   /**
    * Initialize the benchmark results tracking and output.
    */
   public void initBenchmarkResults() {
+
+    Configuration conf = new Configuration();
+
+    kvSize = conf.getInt("benchmark.kvSize", 100);
+    tableName = Bytes.toBytes("bench.GetFromMemory." + String.format("%d", kvSize));
+    //This is produce 1,000,000 kvs for 100 byte keys. For larger size kvs we want fewer
+    //of them to keep the load time reasonable.
+    numKVs = 1000000 / (kvSize/100);
+    numRegionsPerRS = conf.getInt("benchmark.numRegionsPerRegionServer", 1);
+    cfName = conf.get("benchmark.cfName", "cf1");
+
+    String[] clientThreadsStr = conf.getStrings("benchmark.clientThreads", new String[]{"16", "32", "64", "128"});
+    CLIENT_THREADS = new Integer[clientThreadsStr.length];
+    int index = 0;
+    for (String str : clientThreadsStr) {
+      CLIENT_THREADS[index++] = new Integer(str);
+    }
+
+    String[] numConnectionsStr = conf.getStrings("benchmark.numConnections", new String[]{ "2", "4", "8", "16" });
+    NUM_CONNECTIONS = new Integer[numConnectionsStr.length];
+    index = 0;
+    for (String str : numConnectionsStr) {
+      NUM_CONNECTIONS[index++] = new Integer(str);
+    }
+
     List<String> header = new ArrayList<String>();
     header.add("Threads");
     for (int i = 0; i < NUM_CONNECTIONS.length; i++) {
       header.add("   conn=" +  NUM_CONNECTIONS[i]);
     }
     benchmarkResults = new BenchmarkResults<Integer, Integer, Double>(
-        CLIENT_THREADS, NUM_CONNECTIONS, "  %5d", "   %5.2f", header);
+        CLIENT_THREADS, NUM_CONNECTIONS, "  %5d", "   %,10.0f", header);
   }
   
   public void runBenchmark() throws Throwable {
@@ -47,7 +76,7 @@ public class GetBenchmark extends Benchm
     System.out.println("Total kvs = " + keysWritten.size());
     // warm block cache, force jit compilation
     System.out.println("Warming blockcache and forcing JIT compilation...");
-    runExperiment("warmup-", false, 1, 100*1000, 1);
+    runExperiment("warmup-", false, 1, 100000 * (kvSize/100), 1);
     // iterate on the number of connections
     for (int numConnections : NUM_CONNECTIONS) {
       LOG.info("Num connection = " + numConnections);
@@ -58,9 +87,9 @@ public class GetBenchmark extends Benchm
         if (numConnections > numThreads) continue;
         try {
           // read enough KVs to benchmark within a reasonable time
-          long numKVsToRead = 40*1000;
-          if (numThreads >= 5) numKVsToRead = 20*1000;
-          if (numThreads >= 40) numKVsToRead = 10*1000;
+          long numKVsToRead = (1024*1024*1024)/(kvSize/4);
+          if (numThreads >= 5) numKVsToRead /= 2;
+          if (numThreads >= 40) numKVsToRead /= 2;
           // run the experiment
           runExperiment("t" + numThreads + "-", true, 
               numThreads, numKVsToRead, numConnections);
@@ -74,39 +103,100 @@ public class GetBenchmark extends Benchm
   public void runExperiment(String prefix, boolean printStats, 
       int numThreads, long numReadsPerThread, int numConnections) 
   throws IOException {
-    // Prepare the read threads
+
+    //Used in the run loop for each worker thread
+    AtomicBoolean running = new AtomicBoolean(true);
+    //Used to sync all worker threads on startup
+    CountDownLatch readySignal = new CountDownLatch(numThreads);
+    //Used to signal to the worker threads to start generating load
+    CountDownLatch startSignal = new CountDownLatch(1);
+    //Used by the worker threads to signal when each one has started
+    // generating load
+    CountDownLatch providingLoad = new CountDownLatch(numThreads);
+    //Used to tell the worker threads it is safe to start collecting
+    //statistics as all the other threads are currently generating load
+    AtomicBoolean startCollectingStats = new AtomicBoolean(false);
+    //Used to tell the worker threads to stop collecting load
+    AtomicBoolean stopCollectingStats = new AtomicBoolean(false);
+    //Used by the worker threads to signal they are done collecting
+    //statistics
+    CountDownLatch doneSignal = new CountDownLatch(numThreads);
+
+    // Prepare the worker threads
     GetBenchMarkThread threads[] = new GetBenchMarkThread[numThreads];
     for (int i = 0; i < numThreads; i++) {
-      if (connCountConfs[numConnections] == null) {
-        connCountConfs[numConnections] = getNewConfObject();
+      if (connCountConfs.get(numConnections) == null) {
+        connCountConfs.put(numConnections, getNewConfObject());
         // set the number of connections per thread
-        connCountConfs[numConnections].setInt(
+        connCountConfs.get(numConnections).setInt(
             HBaseClient.NUM_CONNECTIONS_PER_SERVER, numConnections);
       }
-      Configuration conf = connCountConfs[numConnections];
+      Configuration conf = connCountConfs.get(numConnections);
       threads[i] = new GetBenchMarkThread(prefix+i, tableName, 
-          Bytes.toBytes(cfName), 0, numKVs, numReadsPerThread, conf, true);
+          Bytes.toBytes(cfName), 0, numKVs, conf, true,
+          running, readySignal, startSignal, providingLoad,
+          startCollectingStats, stopCollectingStats, doneSignal);
     }
     // start the read threads, each one times itself
     for (int i = 0; i < numThreads; i++) {
       threads[i].start();
     }
+
+    try {
+
+      //System.out.println("Waiting on threads to start up.");
+      //Signal Threads to start running test when all are ready
+      readySignal.await();
+
+      //System.out.println("Signaling threads to start generating load.");
+      //Start generating load
+      startSignal.countDown();
+
+      //System.out.println("Waiting for all threads to start generating load.");
+      //Wait for all thread to start Providing load
+      providingLoad.await();
+
+      //System.out.println("Signaling threads to start collecting stats.");
+      //Signal threads to collect stats
+      startCollectingStats.set(true);
+
+      //System.out.println("Waiting while threads collect stats.");
+      //Wait some time
+      Thread.sleep(60 * 1000);
+
+      //System.out.println("Signaling threads to stop collecting stats.");
+      //Signal threads to stop collecting stats
+      stopCollectingStats.set(true);
+
+      //System.out.println("Waiting for all threads to stop collecting stats.");
+      //Wait for all threads to be finished collecting stats
+      doneSignal.await();
+
+      //System.out.println("Signaling all threads to shutdown.");
+      //Stop all client threads
+      running.set(false);
+
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+
     // wait for all the threads and compute the total ops/sec
     double totalOpsPerSec = 0;
+    double totalBytesPerSec = 0;
+
     int successThreads = 0;
     for (int i = 0; i < numThreads; i++) {
-      try {
-        threads[i].join();
-        totalOpsPerSec += threads[i].getOpsPerSecond();
-        successThreads++;
-      } catch (InterruptedException e) {
-        LOG.error("Exception in thread " + i, e);
-      }
+      //threads[i].join();
+      totalOpsPerSec += threads[i].getOpsPerSecond();
+      totalBytesPerSec += threads[i].getBytesPerSecond();
+      successThreads++;
     }
     System.out.println("Num threads =  " + successThreads + ", " + 
-        "performance = " + String.format("%5.2f", totalOpsPerSec) + " ops/sec");
+        "performance = " + String.format("%6.2f", totalOpsPerSec) + " ops/sec " +
+        String.format("%,10.0f", totalBytesPerSec) + " bytes/sec");
     // add to the benchmark results
-    benchmarkResults.addResult(numThreads, numConnections, totalOpsPerSec);
+    benchmarkResults.addResult(numThreads, numConnections, totalBytesPerSec);
   }
   
   /**
@@ -120,63 +210,133 @@ public class GetBenchmark extends Benchm
     byte[] cf;
     int startKey;
     int endKey;
-    long numGetsToPerform;
+    // number of reads we have performed
+    long numSuccessfulGets = 0;
     Configuration conf;
     long timeTakenMillis = 0;
     boolean debug = false;
     Random random = new Random();
+
+    private boolean collectedStats = false;
+    private boolean communicatedLoad = false;
+
+    private AtomicBoolean running;
+    private AtomicBoolean startCollectingStats;
+    private AtomicBoolean stopCollectingStats;
+
+    private CountDownLatch readySignal;
+    private CountDownLatch startSignal;
+    private CountDownLatch providingLoadSignal;
+    private CountDownLatch doneSignal;
     
     public GetBenchMarkThread(String name, byte[] table, byte[] cf, 
-        int startKey, int endKey, long numGetsToPerform, Configuration conf, 
-        boolean debug) {
+        int startKey, int endKey, Configuration conf, boolean debug,
+        AtomicBoolean running, CountDownLatch readySignal, CountDownLatch startSignal,
+        CountDownLatch providingLoadSignal, AtomicBoolean startCollectingStats,
+        AtomicBoolean stopCollectingStats, CountDownLatch doneSignal) {
       this.name = name;
       this.table = table;
       this.cf = cf;
       this.startKey = startKey;
       this.endKey = endKey;
-      this.numGetsToPerform = numGetsToPerform;
       this.conf = conf;
       this.debug = debug;
+      this.running = running;
+      this.readySignal = readySignal;
+      this.startSignal = startSignal;
+      this.providingLoadSignal = providingLoadSignal;
+      this.startCollectingStats = startCollectingStats;
+      this.stopCollectingStats = stopCollectingStats;
+      this.doneSignal = doneSignal;
     }
-    
+
+    /**
+     * Returns the number of bytes/second at the current moment.
+     */
+    public double getBytesPerSecond() {
+      return kvSize*getOpsPerSecond();
+    }
+
     /**
      * Returns the number of ops/second at the current moment.
      */
     public double getOpsPerSecond() {
-      return (numGetsToPerform * 1.0 * 1000 / timeTakenMillis);
+      return (numSuccessfulGets / (timeTakenMillis/1000.0));
     }
     
     public void run() {
       try {
+        //Signal this thread is ready
+        readySignal.countDown();
+        //Wait for start signal
+        startSignal.await();
         // create a new HTable instance
         HTable htable = new HTable(conf, tableName);
         byte[] rowKey = null;
-        // number of reads we have performed
-        long numSuccessfulGets = 0;
-        while (numSuccessfulGets < numGetsToPerform) {
+        Result result;
+        //Run until stopped
+        while (running.get()) {
+
+          //If this worker has collected stats in the past
+          //and is now being told not to do so signal
+          //that this thread is done.
+          if (collectedStats &&
+              stopCollectingStats.get()) {
+            //Signal This thread is done collecting stats
+            doneSignal.countDown();
+          }
+
           // create a random key in the range passed in
           rowKey = keysWritten.get(random.nextInt(keysWritten.size()));
-          // keys are assumed to be 20 chars
+
           Get get = new Get(rowKey);
           get.addFamily(cf);
+
           // time the actual get
           long t1 = System.currentTimeMillis();
-          htable.get(get);
-          timeTakenMillis += System.currentTimeMillis() - t1;
-          numSuccessfulGets++;
-          // print progress if needed
-          if (debug && numSuccessfulGets % PRINT_INTERVAL == 0) {
-            double opsPerSec = 
-              (numSuccessfulGets * 1.0 * 1000 / timeTakenMillis);
-            LOG.debug("[Thread-" + name + "] " + "" +
-            		"Num gets = " + numSuccessfulGets + "/" + numGetsToPerform + 
-            		", current rate = " + String.format("%.2f", opsPerSec) + 
-            		" ops/sec");
+          result = htable.get(get);
+          long delta = System.currentTimeMillis() - t1;
+
+          //Let the coordinating thread know we are providing load.
+          //If we have not been counted as generating load before
+          //count us now.
+          if (!communicatedLoad) {
+            providingLoadSignal.countDown();
+            communicatedLoad = true;
+          }
+
+          //Test to make sure the result we got back is what we asked for
+          if (result == null) {
+            LOG.warn("Null result returned from HBase!!!");
+          } else {
+            byte [] resultRowKey = result.getRow();
+
+            if(result.isEmpty()) {
+              LOG.info("NO RESULTS RETURNED!!!");
+            }
+
+            if (Bytes.equals(rowKey, resultRowKey)) {
+              if (!stopCollectingStats.get() &&
+                  startCollectingStats.get()) {
+                collectedStats = true;
+                timeTakenMillis += delta;
+                numSuccessfulGets++;
+              }
+
+            } else {
+              LOG.warn("Row Keys didn't match!!! Get RowKey: " + Bytes.toStringBinary(get.getRow()) +
+                  " result: " + Bytes.toStringBinary(result.getRow()) +
+                  " rowKey: " + Bytes.toStringBinary(rowKey));
+            }
           }
         }
+
       } catch (IOException e) {
         LOG.error("IOException while running read thread, will exit", e);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
       }
+
     }
   }
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java?rev=1465284&r1=1465283&r2=1465284&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/RegionGetBenchmark.java Sat Apr  6 18:18:13 2013
@@ -85,7 +85,7 @@ public class RegionGetBenchmark extends 
     HRegionInfo hRegionInfo = 
       new HRegionInfo(htd, getRowKeyFromLong(0), null, false);
     bulkLoadDataForRegion(fs, basedir, hRegionInfo, null, cfName, kvSize, 
-        numKVs, keysWritten);
+        numKVs, Bytes.toBytes(tableName), keysWritten, conf);
 
     // setup all the regions
     int maxRegions = NUM_REGIONS[0];