You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/08/10 20:53:47 UTC

svn commit: r1512817 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/ hbase-it/src/test/java/org/apache/hadoop/hbase/ hbase-server/src/test/java/org/apache/hadoop/hbase/util/

Author: jxiang
Date: Sat Aug 10 18:53:46 2013
New Revision: 1512817

URL: http://svn.apache.org/r1512817
Log:
HBASE-9155 Enhance LoadTestTool to support updates

Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java   (with props)
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java   (with props)
Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java Sat Aug 10 18:53:46 2013
@@ -25,6 +25,14 @@ import java.util.Set;
 public abstract class LoadTestDataGenerator {
   protected final LoadTestKVGenerator kvGenerator;
 
+  // The mutate info column stores information
+  // about update done to this column family this row.
+  public final static byte[] MUTATE_INFO = "mutate_info".getBytes();
+
+  // The increment column always has a long value,
+  // which can be incremented later on during updates.
+  public final static byte[] INCREMENT = "increment".getBytes();
+
   /**
    * Initializes the object.
    * @param minValueSize minimum size of the value generated by

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java Sat Aug 10 18:53:46 2013
@@ -115,6 +115,19 @@ public abstract class IngestIntegrationT
 
       ret = loadTool.run(new String[] {
           "-tn", tableName,
+          "-update", String.format("60:%d", writeThreads),
+          "-start_key", String.valueOf(startKey),
+          "-num_keys", String.valueOf(numKeys),
+          "-skip_init"
+      });
+      if (0 != ret) {
+        String errorMsg = "Update failed with error code " + ret;
+        LOG.error(errorMsg);
+        Assert.fail(errorMsg);
+      }
+
+      ret = loadTool.run(new String[] {
+          "-tn", tableName,
           "-read", "100:20",
           "-start_key", String.valueOf(startKey),
           "-num_keys", String.valueOf(numKeys),

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java Sat Aug 10 18:53:46 2013
@@ -24,8 +24,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.util.Regi
 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
 import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Sat Aug 10 18:53:46 2013
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
@@ -70,10 +69,14 @@ public class LoadTestTool extends Abstra
       "<avg_cols_per_key>:<avg_data_size>" +
       "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
 
-  /** Usa\ge string for the read option */
+  /** Usage string for the read option */
   protected static final String OPT_USAGE_READ =
       "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
 
+  /** Usage string for the update option */
+  protected static final String OPT_USAGE_UPDATE =
+      "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+
   protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
       Arrays.toString(BloomType.values());
 
@@ -111,6 +114,8 @@ public class LoadTestTool extends Abstra
   protected static final String OPT_SKIP_INIT = "skip_init";
   protected static final String OPT_INIT_ONLY = "init_only";
   private static final String NUM_TABLES = "num_tables";
+  protected static final String OPT_BATCHUPDATE = "batchupdate";
+  protected static final String OPT_UPDATE = "update";
 
   protected static final long DEFAULT_START_KEY = 0;
 
@@ -119,10 +124,11 @@ public class LoadTestTool extends Abstra
 
   protected MultiThreadedWriter writerThreads = null;
   protected MultiThreadedReader readerThreads = null;
+  protected MultiThreadedUpdater updaterThreads = null;
 
   protected long startKey, endKey;
 
-  protected boolean isWrite, isRead;
+  protected boolean isWrite, isRead, isUpdate;
 
   // Column family options
   protected DataBlockEncoding dataBlockEncodingAlgo;
@@ -136,6 +142,11 @@ public class LoadTestTool extends Abstra
   protected int minColDataSize, maxColDataSize;
   protected boolean isMultiPut;
 
+  // Updater options
+  protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
+  protected int updatePercent;
+  protected boolean isBatchUpdate;
+
   // Reader options
   private int numReaderThreads = DEFAULT_NUM_THREADS;
   private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
@@ -212,6 +223,7 @@ public class LoadTestTool extends Abstra
     addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
     addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
     addOptWithArg(OPT_READ, OPT_USAGE_READ);
+    addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
     addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
     addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
     addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
@@ -225,6 +237,8 @@ public class LoadTestTool extends Abstra
 
     addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
         "separate puts for every column in a row");
+    addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
+        "separate updates for every column in a row");
     addOptNoArg(OPT_ENCODE_IN_CACHE_ONLY, OPT_ENCODE_IN_CACHE_ONLY_USAGE);
     addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
 
@@ -250,16 +264,17 @@ public class LoadTestTool extends Abstra
 
     isWrite = cmd.hasOption(OPT_WRITE);
     isRead = cmd.hasOption(OPT_READ);
+    isUpdate = cmd.hasOption(OPT_UPDATE);
     isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
 
-    if (!isWrite && !isRead && !isInitOnly) {
+    if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
       throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
-          "-" + OPT_READ + " has to be specified");
+        "-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified");
     }
 
-    if (isInitOnly && (isRead || isWrite)) {
+    if (isInitOnly && (isRead || isWrite || isUpdate)) {
       throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
-          + " either -" + OPT_WRITE + " or -" + OPT_READ);
+          + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
     }
 
     if (!isInitOnly) {
@@ -303,6 +318,21 @@ public class LoadTestTool extends Abstra
           + maxColDataSize);
     }
 
+    if (isUpdate) {
+      String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 2);
+      int colIndex = 0;
+      updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
+      if (colIndex < mutateOpts.length) {
+        numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
+      }
+
+      isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
+
+      System.out.println("Batch updates: " + isBatchUpdate);
+      System.out.println("Percent of keys to update: " + updatePercent);
+      System.out.println("Updater threads: " + numUpdaterThreads);
+    }
+
     if (isRead) {
       String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
       int colIndex = 0;
@@ -390,16 +420,27 @@ public class LoadTestTool extends Abstra
       writerThreads.setMultiPut(isMultiPut);
     }
 
+    if (isUpdate) {
+      updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
+      updaterThreads.setBatchUpdate(isBatchUpdate);
+    }
+
     if (isRead) {
       readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
       readerThreads.setMaxErrors(maxReadErrors);
       readerThreads.setKeyWindow(keyWindow);
     }
 
-    if (isRead && isWrite) {
-      LOG.info("Concurrent read/write workload: making readers aware of the " +
-          "write point");
-      readerThreads.linkToWriter(writerThreads);
+    if (isUpdate && isWrite) {
+      LOG.info("Concurrent write/update workload: making updaters aware of the " +
+        "write point");
+      updaterThreads.linkToWriter(writerThreads);
+    }
+
+    if (isRead && (isUpdate || isWrite)) {
+      LOG.info("Concurrent write/read workload: making readers aware of the " +
+        "write point");
+      readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
     }
 
     if (isWrite) {
@@ -407,6 +448,11 @@ public class LoadTestTool extends Abstra
       writerThreads.start(startKey, endKey, numWriterThreads);
     }
 
+    if (isUpdate) {
+      System.out.println("Starting to mutate data...");
+      updaterThreads.start(startKey, endKey, numUpdaterThreads);
+    }
+
     if (isRead) {
       System.out.println("Starting to read data...");
       readerThreads.start(startKey, endKey, numReaderThreads);
@@ -416,6 +462,10 @@ public class LoadTestTool extends Abstra
       writerThreads.waitForFinish();
     }
 
+    if (isUpdate) {
+      updaterThreads.waitForFinish();
+    }
+
     if (isRead) {
       readerThreads.waitForFinish();
     }
@@ -424,13 +474,16 @@ public class LoadTestTool extends Abstra
     if (isWrite) {
       success = success && writerThreads.getNumWriteFailures() == 0;
     }
+    if (isUpdate) {
+      success = success && updaterThreads.getNumWriteFailures() == 0;
+    }
     if (isRead) {
       success = success && readerThreads.getNumReadErrors() == 0
           && readerThreads.getNumReadFailures() == 0;
     }
-    return success ? EXIT_SUCCESS : this.EXIT_FAILURE;
+    return success ? EXIT_SUCCESS : EXIT_FAILURE;
   }
-  
+
   public static void main(String[] args) {
     new LoadTestTool().doStaticMain(args);
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Sat Aug 10 18:53:46 2013
@@ -16,8 +16,13 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
+
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -27,12 +32,18 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
 import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Common base class for reader and writer parts of multi-thread HBase load
  * test ({@link LoadTestTool}).
@@ -300,7 +311,7 @@ public abstract class MultiThreadedActio
 
     // See if we have any data at all.
     if (result.isEmpty()) {
-      LOG.error("No data returned for key = [" + rowKeyStr + "]");
+      LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
       return false;
     }
 
@@ -311,7 +322,8 @@ public abstract class MultiThreadedActio
     // See if we have all the CFs.
     byte[][] expectedCfs = dataGenerator.getColumnFamilies();
     if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
-      LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size());
+      LOG.error("Error checking data for key [" + rowKeyStr
+        + "], bad family count: " + result.getMap().size());
       return false;
     }
 
@@ -320,34 +332,155 @@ public abstract class MultiThreadedActio
       String cfStr = Bytes.toString(cf);
       Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
       if (columnValues == null) {
-        LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]");
+        LOG.error("Error checking data for key [" + rowKeyStr
+          + "], no data for family [" + cfStr + "]]");
         return false;
       }
-      // See if we have correct columns.
-      if (verifyCfAndColumnIntegrity
-          && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
-        String colsStr = "";
-        for (byte[] col : columnValues.keySet()) {
-          if (colsStr.length() > 0) {
-            colsStr += ", ";
+
+      Map<String, MutationType> mutateInfo = null;
+      if (verifyCfAndColumnIntegrity || verifyValues) {
+        if (!columnValues.containsKey(MUTATE_INFO)) {
+          LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+            + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
+          return false;
+        }
+
+        long cfHash = Arrays.hashCode(cf);
+        // Verify deleted columns, and make up column counts if deleted
+        byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
+        mutateInfo = parseMutateInfo(mutateInfoValue);
+        for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) {
+          if (mutate.getValue() == MutationType.DELETE) {
+            byte[] column = Bytes.toBytes(mutate.getKey());
+            long columnHash = Arrays.hashCode(column);
+            long hashCode = cfHash + columnHash;
+            if (hashCode % 2 == 0) {
+              if (columnValues.containsKey(column)) {
+                LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+                  + cfStr + "], column [" + mutate.getKey() + "]; should be deleted");
+                return false;
+              }
+              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
+              columnValues.put(column, hashCodeBytes);
+            }
           }
-          colsStr += "[" + Bytes.toString(col) + "]";
         }
-        LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr);
-        return false;
-      }
-      // See if values check out.
-      if (verifyValues) {
-        for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
-          if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) {
+
+        // Verify increment
+        if (!columnValues.containsKey(INCREMENT)) {
+          LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+            + cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
+          return false;
+        }
+        long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
+        if (verifyValues) {
+          long amount = mutateInfo.isEmpty() ? 0 : cfHash;
+          long originalValue = Arrays.hashCode(result.getRow());
+          long extra = currentValue - originalValue;
+          if (extra != 0 && (amount == 0 || extra % amount != 0)) {
             LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
-              + cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " +
-              + kv.getValue().length);
+              + cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
             return false;
           }
+          if (amount != 0 && extra != amount) {
+            LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
+              + cfStr + "], column [increment], incremented [" + (extra / amount) + "] times");
+          }
+        }
+
+        // See if we have correct columns.
+        if (verifyCfAndColumnIntegrity
+            && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
+          String colsStr = "";
+          for (byte[] col : columnValues.keySet()) {
+            if (colsStr.length() > 0) {
+              colsStr += ", ";
+            }
+            colsStr += "[" + Bytes.toString(col) + "]";
+          }
+          LOG.error("Error checking data for key [" + rowKeyStr
+            + "], bad columns for family [" + cfStr + "]: " + colsStr);
+          return false;
+        }
+        // See if values check out.
+        if (verifyValues) {
+          for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
+            String column = Bytes.toString(kv.getKey());
+            MutationType mutation = mutateInfo.get(column);
+            boolean verificationNeeded = true;
+            byte[] bytes = kv.getValue();
+            if (mutation != null) {
+              boolean mutationVerified = true;
+              long columnHash = Arrays.hashCode(kv.getKey());
+              long hashCode = cfHash + columnHash;
+              byte[] hashCodeBytes = Bytes.toBytes(hashCode);
+              if (mutation == MutationType.APPEND) {
+                int offset = bytes.length - hashCodeBytes.length;
+                mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes,
+                  0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
+                if (mutationVerified) {
+                  int n = 1;
+                  while (true) {
+                    int newOffset = offset - hashCodeBytes.length;
+                    if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0,
+                        hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) {
+                      break;
+                    }
+                    offset = newOffset;
+                    n++;
+                  }
+                  if (n > 1) {
+                    LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
+                      + cfStr + "], column [" + column + "], appended [" + n + "] times");
+                  }
+                  byte[] dest = new byte[offset];
+                  System.arraycopy(bytes, 0, dest, 0, offset);
+                  bytes = dest;
+                }
+              } else if (hashCode % 2 == 0) { // checkAndPut
+                mutationVerified = Bytes.equals(bytes, hashCodeBytes);
+                verificationNeeded = false;
+              }
+              if (!mutationVerified) {
+                LOG.error("Error checking data for key [" + rowKeyStr
+                  + "], mutation checking failed for column family [" + cfStr + "], column ["
+                  + column + "]; mutation [" + mutation + "], hashCode ["
+                  + hashCode + "], verificationNeeded ["
+                  + verificationNeeded + "]");
+                return false;
+              }
+            } // end of mutation checking
+            if (verificationNeeded &&
+                !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) {
+              LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+                + cfStr + "], column [" + column + "], mutation [" + mutation
+                + "]; value of length " + bytes.length);
+              return false;
+            }
+          }
         }
       }
     }
     return true;
   }
+
+  // Parse mutate info into a map of <column name> => <update action>
+  private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
+    Map<String, MutationType> mi = new HashMap<String, MutationType>();
+    if (mutateInfo != null) {
+      String mutateInfoStr = Bytes.toString(mutateInfo);
+      String[] mutations = mutateInfoStr.split("#");
+      for (String mutation: mutations) {
+        if (mutation.isEmpty()) continue;
+        Preconditions.checkArgument(mutation.contains(":"),
+          "Invalid mutation info " + mutation);
+        int p = mutation.indexOf(":");
+        String column = mutation.substring(0, p);
+        MutationType type = MutationType.valueOf(
+          Integer.parseInt(mutation.substring(p+1)));
+        mi.put(column, type);
+      }
+    }
+    return mi;
+  }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Sat Aug 10 18:53:46 2013
@@ -18,15 +18,15 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
@@ -41,7 +41,7 @@ public class MultiThreadedReader extends
   private final double verifyPercent;
   private volatile boolean aborted;
 
-  private MultiThreadedWriter writer = null;
+  private MultiThreadedWriterBase writer = null;
 
   /**
    * The number of keys verified in a sequence. This will never be larger than
@@ -77,9 +77,9 @@ public class MultiThreadedReader extends
     this.verifyPercent = verifyPercent;
   }
 
-  public void linkToWriter(MultiThreadedWriter writer) {
+  public void linkToWriter(MultiThreadedWriterBase writer) {
     this.writer = writer;
-    writer.setTrackInsertedKeys(true);
+    writer.setTrackWroteKeys(true);
   }
 
   public void setMaxErrors(int maxErrors) {
@@ -108,7 +108,6 @@ public class MultiThreadedReader extends
   public class HBaseReaderThread extends Thread {
     private final int readerId;
     private final HTable table;
-    private final Random random = new Random();
 
     /** The "current" key being read. Increases from startKey to endKey. */
     private long curKey;
@@ -182,13 +181,13 @@ public class MultiThreadedReader extends
      * constraint.
      */
     private long maxKeyWeCanRead() {
-      long insertedUpToKey = writer.insertedUpToKey();
+      long insertedUpToKey = writer.wroteUpToKey();
       if (insertedUpToKey >= endKey - 1) {
         // The writer has finished writing our range, so we can read any
         // key in the range.
         return endKey - 1;
       }
-      return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow);
+      return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
     }
 
     private long getNextKeyToRead() {
@@ -217,7 +216,7 @@ public class MultiThreadedReader extends
       // later. Set a flag to make sure that we don't count this key towards
       // the set of unique keys we have verified.
       readingRandomKey = true;
-      return startKey + Math.abs(random.nextLong())
+      return startKey + Math.abs(RandomUtils.nextLong())
           % (maxKeyToRead - startKey + 1);
     }
 
@@ -239,7 +238,7 @@ public class MultiThreadedReader extends
         if (verbose) {
           LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
         }
-        queryKey(get, random.nextInt(100) < verifyPercent);
+        queryKey(get, RandomUtils.nextInt(100) < verifyPercent);
       } catch (IOException e) {
         numReadFailures.addAndGet(1);
         LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
@@ -279,7 +278,7 @@ public class MultiThreadedReader extends
         numCols.addAndGet(cols);
       } else {
         if (writer != null) {
-          LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys");
+          LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
         }
         numErrorsAfterThis = numReadErrors.incrementAndGet();
       }
@@ -315,5 +314,4 @@ public class MultiThreadedReader extends
     appendToStatus(sb, "READ ERRORS", numReadErrors.get());
     return sb.toString();
   }
-
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java?rev=1512817&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java Sat Aug 10 18:53:46 2013
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+
+/** Creates multiple threads that write key/values into the */
+public class MultiThreadedUpdater extends MultiThreadedWriterBase {
+  private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class);
+
+  private Set<HBaseUpdaterThread> updaters = new HashSet<HBaseUpdaterThread>();
+
+  private MultiThreadedWriterBase writer = null;
+  private boolean isBatchUpdate = false;
+  private final double updatePercent;
+
+  public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
+      TableName tableName, double updatePercent) {
+    super(dataGen, conf, tableName, "U");
+    this.updatePercent = updatePercent;
+  }
+
+  /** Use batch vs. separate updates for every column in a row */
+  public void setBatchUpdate(boolean isBatchUpdate) {
+    this.isBatchUpdate = isBatchUpdate;
+  }
+
+  public void linkToWriter(MultiThreadedWriterBase writer) {
+    this.writer = writer;
+    writer.setTrackWroteKeys(true);
+  }
+
+  @Override
+  public void start(long startKey, long endKey, int numThreads)
+      throws IOException {
+    super.start(startKey, endKey, numThreads);
+
+    if (verbose) {
+      LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
+    }
+
+    for (int i = 0; i < numThreads; ++i) {
+      HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
+      updaters.add(updater);
+    }
+
+    startThreads(updaters);
+  }
+
+  private long getNextKeyToUpdate() {
+    if (writer == null) {
+      return nextKeyToWrite.getAndIncrement();
+    }
+    synchronized (this) {
+      if (nextKeyToWrite.get() >= endKey) {
+        // Finished the whole key range
+        return endKey;
+      }
+      while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
+        Threads.sleepWithoutInterrupt(100);
+      }
+      long k = nextKeyToWrite.getAndIncrement();
+      if (writer.failedToWriteKey(k)) {
+        failedKeySet.add(k);
+        return getNextKeyToUpdate();
+      }
+      return k;
+    }
+  }
+
+  private class HBaseUpdaterThread extends Thread {
+    private final HTable table;
+
+    public HBaseUpdaterThread(int updaterId) throws IOException {
+      setName(getClass().getSimpleName() + "_" + updaterId);
+      table = new HTable(conf, tableName);
+    }
+
+    public void run() {
+      try {
+        long rowKeyBase;
+        StringBuilder buf = new StringBuilder();
+        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+        while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
+          if (RandomUtils.nextInt(100) < updatePercent) {
+            byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
+            Increment inc = new Increment(rowKey);
+            Append app = new Append(rowKey);
+            numKeys.addAndGet(1);
+            int columnCount = 0;
+            for (byte[] cf : columnFamilies) {
+              long cfHash = Arrays.hashCode(cf);
+              inc.addColumn(cf, INCREMENT, cfHash);
+              buf.setLength(0); // Clear the buffer
+              buf.append("#").append(Bytes.toString(INCREMENT));
+              buf.append(":").append(MutationType.INCREMENT.getNumber());
+              app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
+              ++columnCount;
+              if (!isBatchUpdate) {
+                mutate(table, inc, rowKeyBase);
+                numCols.addAndGet(1);
+                inc = new Increment(rowKey);
+                mutate(table, app, rowKeyBase);
+                numCols.addAndGet(1);
+                app = new Append(rowKey);
+              }
+              Result result = null;
+              try {
+                Get get = new Get(rowKey);
+                get.addFamily(cf);
+                result = table.get(get);
+              } catch (IOException ie) {
+                LOG.warn("Failed to get the row for key = ["
+                  + rowKey + "], column family = [" + Bytes.toString(cf) + "]", ie);
+              }
+              Map<byte[], byte[]> columnValues =
+                result != null ? result.getFamilyMap(cf) : null;
+              if (columnValues == null) {
+                failedKeySet.add(rowKeyBase);
+                LOG.error("Failed to update the row with key = ["
+                  + rowKey + "], since we could not get the original row");
+              }
+              for (byte[] column : columnValues.keySet()) {
+                if (Bytes.equals(column, INCREMENT)
+                    || Bytes.equals(column, MUTATE_INFO)) {
+                  continue;
+                }
+                MutationType mt = MutationType.valueOf(
+                  RandomUtils.nextInt(MutationType.values().length));
+                long columnHash = Arrays.hashCode(column);
+                long hashCode = cfHash + columnHash;
+                byte[] hashCodeBytes = Bytes.toBytes(hashCode);
+                byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
+                if (hashCode % 2 == 0) {
+                  KeyValue kv = result.getColumnLatest(cf, column);
+                  checkedValue = kv != null ? kv.getValue() : null;
+                  Preconditions.checkNotNull(checkedValue,
+                    "Column value to be checked should not be null");
+                }
+                buf.setLength(0); // Clear the buffer
+                buf.append("#").append(Bytes.toString(column)).append(":");
+                ++columnCount;
+                switch (mt) {
+                case PUT:
+                  Put put = new Put(rowKey);
+                  put.add(cf, column, hashCodeBytes);
+                  mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
+                  buf.append(MutationType.PUT.getNumber());
+                  break;
+                case DELETE:
+                  Delete delete = new Delete(rowKey);
+                  // Delete all versions since a put
+                  // could be called multiple times if CM is used
+                  delete.deleteColumns(cf, column);
+                  mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
+                  buf.append(MutationType.DELETE.getNumber());
+                  break;
+                default:
+                  buf.append(MutationType.APPEND.getNumber());
+                  app.add(cf, column, hashCodeBytes);
+                }
+                app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
+                if (!isBatchUpdate) {
+                  mutate(table, app, rowKeyBase);
+                  numCols.addAndGet(1);
+                  app = new Append(rowKey);
+                }
+              }
+            }
+            if (isBatchUpdate) {
+              if (verbose) {
+                LOG.debug("Preparing increment and append for key = ["
+                  + rowKey + "], " + columnCount + " columns");
+              }
+              mutate(table, inc, rowKeyBase);
+              mutate(table, app, rowKeyBase);
+              numCols.addAndGet(columnCount);
+            }
+          }
+          if (trackWroteKeys) {
+            wroteKeys.add(rowKeyBase);
+          }
+        }
+      } finally {
+        try {
+          table.close();
+        } catch (IOException e) {
+          LOG.error("Error closing table", e);
+        }
+        numThreadsWorking.decrementAndGet();
+      }
+    }
+  }
+
+  @Override
+  public void waitForFinish() {
+    super.waitForFinish();
+    System.out.println("Failed to update keys: " + failedKeySet.size());
+    for (Long key : failedKeySet) {
+       System.out.println("Failed to update key: " + key);
+    }
+  }
+
+  public void mutate(HTable table, Mutation m, long keyBase) {
+    mutate(table, m, keyBase, null, null, null, null);
+  }
+
+  public void mutate(HTable table, Mutation m,
+      long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
+    long start = System.currentTimeMillis();
+    try {
+      if (m instanceof Increment) {
+        table.increment((Increment)m);
+      } else if (m instanceof Append) {
+        table.append((Append)m);
+      } else if (m instanceof Put) {
+        table.checkAndPut(row, cf, q, v, (Put)m);
+      } else if (m instanceof Delete) {
+        table.checkAndDelete(row, cf, q, v, (Delete)m);
+      } else {
+        throw new IllegalArgumentException(
+          "unsupported mutation " + m.getClass().getSimpleName());
+      }
+      totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+    } catch (IOException e) {
+      failedKeySet.add(keyBase);
+      String exceptionInfo;
+      if (e instanceof RetriesExhaustedWithDetailsException) {
+        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
+        exceptionInfo = aggEx.getExhaustiveDescription();
+      } else {
+        StringWriter stackWriter = new StringWriter();
+        PrintWriter pw = new PrintWriter(stackWriter);
+        e.printStackTrace(pw);
+        pw.flush();
+        exceptionInfo = StringUtils.stringifyException(e);
+      }
+      LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
+        "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+          + exceptionInfo);
+    }
+  }
+}

Propchange: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1512817&r1=1512816&r2=1512817&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Sat Aug 10 18:53:46 2013
@@ -18,72 +18,33 @@
 
 package org.apache.hadoop.hbase.util;
 
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
+import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
+
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import java.util.Arrays;
 import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
 
 /** Creates multiple threads that write key/values into the */
-public class MultiThreadedWriter extends MultiThreadedAction {
+public class MultiThreadedWriter extends MultiThreadedWriterBase {
   private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
 
   private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
 
   private boolean isMultiPut = false;
 
-  /**
-   * A temporary place to keep track of inserted keys. This is written to by
-   * all writers and is drained on a separate thread that populates
-   * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
-   * being inserted. This queue is supposed to stay small.
-   */
-  private BlockingQueue<Long> insertedKeys = new ArrayBlockingQueue<Long>(10000);
-
-  /**
-   * This is the current key to be inserted by any thread. Each thread does an
-   * atomic get and increment operation and inserts the current value.
-   */
-  private AtomicLong nextKeyToInsert = new AtomicLong();
-
-  /**
-   * The highest key in the contiguous range of keys .
-   */
-  private AtomicLong insertedUpToKey = new AtomicLong();
-
-  /** The sorted set of keys NOT inserted by the writers */
-  private Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
-
-  /**
-   * The total size of the temporary inserted key set that have not yet lined
-   * up in a our contiguous sequence starting from startKey. Supposed to stay
-   * small.
-   */
-  private AtomicLong insertedKeyQueueSize = new AtomicLong();
-
-  /** Enable this if used in conjunction with a concurrent reader. */
-  private boolean trackInsertedKeys;
-
   public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
-    TableName tableName) {
+      TableName tableName) {
     super(dataGen, conf, tableName, "W");
   }
 
@@ -101,19 +62,11 @@ public class MultiThreadedWriter extends
       LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
     }
 
-    nextKeyToInsert.set(startKey);
-    insertedUpToKey.set(startKey - 1);
-
     for (int i = 0; i < numThreads; ++i) {
       HBaseWriterThread writer = new HBaseWriterThread(i);
       writers.add(writer);
     }
 
-    if (trackInsertedKeys) {
-      new Thread(new InsertedKeysTracker()).start();
-      numThreadsWorking.incrementAndGet();
-    }
-
     startThreads(writers);
   }
 
@@ -129,13 +82,12 @@ public class MultiThreadedWriter extends
       try {
         long rowKeyBase;
         byte[][] columnFamilies = dataGenerator.getColumnFamilies();
-        while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) {
+        while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
           byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
           Put put = new Put(rowKey);
           numKeys.addAndGet(1);
           int columnCount = 0;
           for (byte[] cf : columnFamilies) {
-            String s;
             byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
             for (byte[] column : columns) {
               byte[] value = dataGenerator.generateValue(rowKey, cf, column);
@@ -147,6 +99,14 @@ public class MultiThreadedWriter extends
                 put = new Put(rowKey);
               }
             }
+            long rowKeyHash = Arrays.hashCode(rowKey);
+            put.add(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
+            put.add(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
+            if (!isMultiPut) {
+              insert(table, put, rowKeyBase);
+              numCols.addAndGet(1);
+              put = new Put(rowKey);
+            }
           }
           if (isMultiPut) {
             if (verbose) {
@@ -155,8 +115,8 @@ public class MultiThreadedWriter extends
             insert(table, put, rowKeyBase);
             numCols.addAndGet(columnCount);
           }
-          if (trackInsertedKeys) {
-            insertedKeys.add(rowKeyBase);
+          if (trackWroteKeys) {
+            wroteKeys.add(rowKeyBase);
           }
         }
       } finally {
@@ -170,104 +130,6 @@ public class MultiThreadedWriter extends
     }
   }
 
-  public void insert(HTable table, Put put, long keyBase) {
-    long start = System.currentTimeMillis();
-    try {
-      table.put(put);
-      totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
-    } catch (IOException e) {
-      failedKeySet.add(keyBase);
-      String exceptionInfo;
-      if (e instanceof RetriesExhaustedWithDetailsException) {
-        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
-        exceptionInfo = aggEx.getExhaustiveDescription();
-      } else {
-        StringWriter stackWriter = new StringWriter();
-        PrintWriter pw = new PrintWriter(stackWriter);
-        e.printStackTrace(pw);
-        pw.flush();
-        exceptionInfo = StringUtils.stringifyException(e);
-      }
-      LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) +
-        "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
-          + exceptionInfo);
-    }
-  }
-
-  private String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
-    HRegionLocation cached = null, real = null;
-    try {
-      cached = table.getRegionLocation(rowKey, false);
-      real = table.getRegionLocation(rowKey, true);
-    } catch (Throwable t) {
-      // Cannot obtain region information for another catch block - too bad!
-    }
-    String result = "no information can be obtained";
-    if (cached != null) {
-      result = "cached: " + cached.toString();
-    }
-    if (real != null) {
-      if (real.equals(cached)) {
-        result += "; cache is up to date";
-      } else {
-        result = (cached != null) ? (result + "; ") : "";
-        result += "real: " + real.toString();
-      }
-    }
-    return result;
-  }
-
-  /**
-   * A thread that keeps track of the highest key in the contiguous range of
-   * inserted keys.
-   */
-  private class InsertedKeysTracker implements Runnable {
-
-    @Override
-    public void run() {
-      Thread.currentThread().setName(getClass().getSimpleName());
-      try {
-        long expectedKey = startKey;
-        Queue<Long> sortedKeys = new PriorityQueue<Long>();
-        while (expectedKey < endKey) {
-          // Block until a new element is available.
-          Long k;
-          try {
-            k = insertedKeys.poll(1, TimeUnit.SECONDS);
-          } catch (InterruptedException e) {
-            LOG.info("Inserted key tracker thread interrupted", e);
-            break;
-          }
-          if (k == null) {
-            continue;
-          }
-          if (k == expectedKey) {
-            // Skip the "sorted key" queue and consume this key.
-            insertedUpToKey.set(k);
-            ++expectedKey;
-          } else {
-            sortedKeys.add(k);
-          }
-
-          // See if we have a sequence of contiguous keys lined up.
-          while (!sortedKeys.isEmpty()
-              && ((k = sortedKeys.peek()) == expectedKey)) {
-            sortedKeys.poll();
-            insertedUpToKey.set(k);
-            ++expectedKey;
-          }
-
-          insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size());
-        }
-      } catch (Exception ex) {
-        LOG.error("Error in inserted key tracker", ex);
-      } finally {
-        numThreadsWorking.decrementAndGet();
-      }
-    }
-
-  }
-
   @Override
   public void waitForFinish() {
     super.waitForFinish();
@@ -276,37 +138,4 @@ public class MultiThreadedWriter extends
        System.out.println("Failed to write key: " + key);
     }
   }
-
-  public int getNumWriteFailures() {
-    return failedKeySet.size();
-  }
-
-  /**
-   * The max key until which all keys have been inserted (successfully or not).
-   * @return the last key that we have inserted all keys up to (inclusive)
-   */
-  public long insertedUpToKey() {
-    return insertedUpToKey.get();
-  }
-
-  public boolean failedToWriteKey(long k) {
-    return failedKeySet.contains(k);
-  }
-
-  @Override
-  protected String progressInfo() {
-    StringBuilder sb = new StringBuilder();
-    appendToStatus(sb, "insertedUpTo", insertedUpToKey.get());
-    appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get());
-    return sb.toString();
-  }
-
-  /**
-   * Used for a joint write/read workload. Enables tracking the last inserted
-   * key, which requires a blocking queue and a consumer thread.
-   * @param enable whether to enable tracking the last inserted key
-   */
-  public void setTrackInsertedKeys(boolean enable) {
-    trackInsertedKeys = enable;
-  }
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java?rev=1512817&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java Sat Aug 10 18:53:46 2013
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.StringUtils;
+
+/** Creates multiple threads that write key/values into the */
+public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
+  private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class);
+
+  /**
+   * A temporary place to keep track of inserted/updated keys. This is written to by
+   * all writers and is drained on a separate thread that populates
+   * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
+   * being inserted/updated. This queue is supposed to stay small.
+   */
+  protected BlockingQueue<Long> wroteKeys = new ArrayBlockingQueue<Long>(10000);
+
+  /**
+   * This is the current key to be inserted/updated by any thread. Each thread does an
+   * atomic get and increment operation and inserts the current value.
+   */
+  protected AtomicLong nextKeyToWrite = new AtomicLong();
+
+  /**
+   * The highest key in the contiguous range of keys .
+   */
+  protected AtomicLong wroteUpToKey = new AtomicLong();
+
+  /** The sorted set of keys NOT inserted/updated by the writers */
+  protected Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
+
+  /**
+   * The total size of the temporary inserted/updated key set that have not yet lined
+   * up in a our contiguous sequence starting from startKey. Supposed to stay
+   * small.
+   */
+  protected AtomicLong wroteKeyQueueSize = new AtomicLong();
+
+  /** Enable this if used in conjunction with a concurrent reader. */
+  protected boolean trackWroteKeys;
+
+  public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
+      TableName tableName, String actionLetter) {
+    super(dataGen, conf, tableName, actionLetter);
+  }
+
+  @Override
+  public void start(long startKey, long endKey, int numThreads)
+      throws IOException {
+    super.start(startKey, endKey, numThreads);
+
+    nextKeyToWrite.set(startKey);
+    wroteUpToKey.set(startKey - 1);
+
+    if (trackWroteKeys) {
+      new Thread(new WroteKeysTracker()).start();
+      numThreadsWorking.incrementAndGet();
+    }
+  }
+
+  protected String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
+    HRegionLocation cached = null, real = null;
+    try {
+      cached = table.getRegionLocation(rowKey, false);
+      real = table.getRegionLocation(rowKey, true);
+    } catch (Throwable t) {
+      // Cannot obtain region information for another catch block - too bad!
+    }
+    String result = "no information can be obtained";
+    if (cached != null) {
+      result = "cached: " + cached.toString();
+    }
+    if (real != null) {
+      if (real.equals(cached)) {
+        result += "; cache is up to date";
+      } else {
+        result = (cached != null) ? (result + "; ") : "";
+        result += "real: " + real.toString();
+      }
+    }
+    return result;
+  }
+
+  /**
+   * A thread that keeps track of the highest key in the contiguous range of
+   * inserted/updated keys.
+   */
+  private class WroteKeysTracker implements Runnable {
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName(getClass().getSimpleName());
+      try {
+        long expectedKey = startKey;
+        Queue<Long> sortedKeys = new PriorityQueue<Long>();
+        while (expectedKey < endKey) {
+          // Block until a new element is available.
+          Long k;
+          try {
+            k = wroteKeys.poll(1, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            LOG.info("Inserted key tracker thread interrupted", e);
+            break;
+          }
+          if (k == null) {
+            continue;
+          }
+          if (k == expectedKey) {
+            // Skip the "sorted key" queue and consume this key.
+            wroteUpToKey.set(k);
+            ++expectedKey;
+          } else {
+            sortedKeys.add(k);
+          }
+
+          // See if we have a sequence of contiguous keys lined up.
+          while (!sortedKeys.isEmpty()
+              && ((k = sortedKeys.peek()) == expectedKey)) {
+            sortedKeys.poll();
+            wroteUpToKey.set(k);
+            ++expectedKey;
+          }
+
+          wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
+        }
+      } catch (Exception ex) {
+        LOG.error("Error in inserted/updaed key tracker", ex);
+      } finally {
+        numThreadsWorking.decrementAndGet();
+      }
+    }
+  }
+
+  public int getNumWriteFailures() {
+    return failedKeySet.size();
+  }
+
+  public void insert(HTable table, Put put, long keyBase) {
+    long start = System.currentTimeMillis();
+    try {
+      table.put(put);
+      totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+    } catch (IOException e) {
+      failedKeySet.add(keyBase);
+      String exceptionInfo;
+      if (e instanceof RetriesExhaustedWithDetailsException) {
+        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
+        exceptionInfo = aggEx.getExhaustiveDescription();
+      } else {
+        StringWriter stackWriter = new StringWriter();
+        PrintWriter pw = new PrintWriter(stackWriter);
+        e.printStackTrace(pw);
+        pw.flush();
+        exceptionInfo = StringUtils.stringifyException(e);
+      }
+      LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) +
+        "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
+          + exceptionInfo);
+    }
+  }
+
+  /**
+   * The max key until which all keys have been inserted/updated (successfully or not).
+   * @return the last key that we have inserted/updated all keys up to (inclusive)
+   */
+  public long wroteUpToKey() {
+    return wroteUpToKey.get();
+  }
+
+  public boolean failedToWriteKey(long k) {
+    return failedKeySet.contains(k);
+  }
+
+  @Override
+  protected String progressInfo() {
+    StringBuilder sb = new StringBuilder();
+    appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
+    appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
+    return sb.toString();
+  }
+
+  /**
+   * Used for a joint write/read workload. Enables tracking the last inserted/updated
+   * key, which requires a blocking queue and a consumer thread.
+   * @param enable whether to enable tracking the last inserted/updated key
+   */
+  public void setTrackWroteKeys(boolean enable) {
+    trackWroteKeys = enable;
+  }
+}

Propchange: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
------------------------------------------------------------------------------
    svn:eol-style = native