You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2013/03/29 16:53:50 UTC

svn commit: r1462511 - in /hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase: io/encoding/ util/

Author: apurtell
Date: Fri Mar 29 15:53:50 2013
New Revision: 1462511

URL: http://svn.apache.org/r1462511
Log:
HBASE-8210. Backport the LoadTest portions of HBASE-7383

Added:
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java
Modified:
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Fri Mar 29 15:53:50 2013
@@ -32,14 +32,11 @@ import org.apache.hadoop.hbase.client.Ge
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
-import org.apache.hadoop.hbase.util.MultiThreadedWriter;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -101,17 +98,18 @@ public class TestEncodedSeekers {
             .setDataBlockEncoding(encoding)
             .setEncodeOnDisk(encodeOnDisk)
     );
+
     LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
         MIN_VALUE_SIZE, MAX_VALUE_SIZE);
 
     // Write
     for (int i = 0; i < NUM_ROWS; ++i) {
-      byte[] key = MultiThreadedWriter.longToByteArrayKey(i);
+      byte[] key = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
         Put put = new Put(key);
-        String colAsStr = String.valueOf(j);
-        byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr);
-        put.add(CF_BYTES, Bytes.toBytes(colAsStr), value);
+        byte[] col = Bytes.toBytes(String.valueOf(j));
+        byte[] value = dataGenerator.generateRandomSizeValue(key, col);
+        put.add(CF_BYTES, col, value);
         region.put(put);
       }
       if (i % NUM_ROWS_PER_FLUSH == 0) {
@@ -122,7 +120,7 @@ public class TestEncodedSeekers {
     for (int doneCompaction = 0; doneCompaction <= 1; ++doneCompaction) {
       // Read
       for (int i = 0; i < NUM_ROWS; ++i) {
-        final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i);
+        byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
         for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
           if (VERBOSE) {
             System.err.println("Reading row " + i + ", column " +  j);
@@ -131,10 +129,10 @@ public class TestEncodedSeekers {
           final byte[] qualBytes = Bytes.toBytes(qualStr);
           Get get = new Get(rowKey);
           get.addColumn(CF_BYTES, qualBytes);
-          Result result = region.get(get, null);
+          Result result = region.get(get);
           assertEquals(1, result.size());
-          assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr,
-              result.getValue(CF_BYTES, qualBytes)));
+          byte[] value = result.getValue(CF_BYTES, qualBytes);
+          assertTrue(LoadTestKVGenerator.verify(value, rowKey, qualBytes));
         }
       }
 

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java?rev=1462511&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGenerator.java Fri Mar 29 15:53:50 2013
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.Set;
+
+/**
+ * A generator of random data (keys/cfs/columns/values) for load testing.
+ * Contains LoadTestKVGenerator as a matter of convenience...
+ */
+public abstract class LoadTestDataGenerator {
+  protected final LoadTestKVGenerator kvGenerator;
+
+  /**
+   * Initializes the object.
+   * @param minValueSize minimum size of the value generated by
+   * {@link #generateValue(byte[], byte[], byte[])}.
+   * @param maxValueSize maximum size of the value generated by
+   * {@link #generateValue(byte[], byte[], byte[])}.
+   */
+  public LoadTestDataGenerator(int minValueSize, int maxValueSize) {
+    this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize);
+ }
+  /**
+   * Generates a deterministic, unique hashed row key from a number. That way, the user can
+   * keep track of numbers, without messing with byte array and ensuring key distribution.
+   * @param keyBase Base number for a key, such as a loop counter.
+   */
+  public abstract byte[] getDeterministicUniqueKey(long keyBase);
+
+  /**
+   * Gets column families for the load test table.
+   * @return The array of byte[]s representing column family names.
+   */
+  public abstract byte[][] getColumnFamilies();
+
+  /**
+   * Generates an applicable set of columns to be used for a particular key and family.
+   * @param rowKey The row key to generate for.
+   * @param cf The column family name to generate for.
+   * @return The array of byte[]s representing column names.
+   */
+  public abstract byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf);
+
+  /**
+   * Generates a value to be used for a particular row/cf/column.
+   * @param rowKey The row key to generate for.
+   * @param cf The column family name to generate for.
+   * @param column The column name to generate for.
+   * @return The value to use.
+   */
+  public abstract byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column);
+
+  /**
+   * Checks that columns for a rowKey and cf are valid if generated via
+   * {@link #generateColumnsForCf(byte[], byte[])}
+   * @param rowKey The row key to verify for.
+   * @param cf The column family name to verify for.
+   * @param columnSet The column set (for example, encountered by read).
+   * @return True iff valid.
+   */
+  public abstract boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet);
+
+  /**
+   * Checks that value for a rowKey/cf/column is valid if generated via
+   * {@link #generateValue(byte[], byte[], byte[])}
+   * @param rowKey The row key to verify for.
+   * @param cf The column family name to verify for.
+   * @param column The column name to verify for.
+   * @param value The value (for example, encountered by read).
+   * @return True iff valid.
+   */
+  public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value);
+}

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java Fri Mar 29 15:53:50 2013
@@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.util.MD5H
  * hash. Values are generated by selecting value size in the configured range
  * and generating a pseudo-random sequence of bytes seeded by key, column
  * qualifier, and value size.
- * <p>
- * Not thread-safe, so a separate instance is needed for every writer thread/
  */
 public class LoadTestKVGenerator {
 
@@ -49,13 +47,13 @@ public class LoadTestKVGenerator {
 
   /**
    * Verifies that the given byte array is the same as what would be generated
-   * for the given row key and qualifier. We are assuming that the value size
-   * is correct, and only verify the actual bytes. However, if the min/max
-   * value sizes are set sufficiently high, an accidental match should be
+   * for the given seed strings (row/cf/column/...). We are assuming that the
+   * value size is correct, and only verify the actual bytes. However, if the
+   * min/max value sizes are set sufficiently high, an accidental match should be
    * extremely improbable.
    */
-  public static boolean verify(String rowKey, String qual, byte[] value) {
-    byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length);
+  public static boolean verify(byte[] value, byte[]... seedStrings) {
+    byte[] expectedData = getValueForRowColumn(value.length, seedStrings);
     return Bytes.equals(expectedData, value);
   }
 
@@ -74,25 +72,31 @@ public class LoadTestKVGenerator {
   /**
    * Generates a value for the given key index and column qualifier. Size is
    * selected randomly in the configured range. The generated value depends
-   * only on the combination of the key, qualifier, and the selected value
-   * size. This allows to verify the actual value bytes when reading, as done
-   * in {@link #verify(String, String, byte[])}.
+   * only on the combination of the strings passed (key/cf/column/...) and the selected
+   * value size. This allows to verify the actual value bytes when reading, as done
+   * in {#verify(byte[], byte[]...)}
+   * This method is as thread-safe as Random class. It appears that the worst bug ever
+   * found with the latter is that multiple threads will get some duplicate values, which
+   * we don't care about.
    */
-  public byte[] generateRandomSizeValue(long key, String qual) {
-    String rowKey = md5PrefixedKey(key);
-    int dataSize = minValueSize + randomForValueSize.nextInt(
-        Math.abs(maxValueSize - minValueSize));
-    return getValueForRowColumn(rowKey, qual, dataSize);
-  }
+  public byte[] generateRandomSizeValue(byte[]... seedStrings) {
+    int dataSize = minValueSize;
+    if (minValueSize != maxValueSize) {
+      dataSize = minValueSize + randomForValueSize.nextInt(Math.abs(maxValueSize - minValueSize));
+    }
+    return getValueForRowColumn(dataSize, seedStrings);
+ }
 
   /**
    * Generates random bytes of the given size for the given row and column
    * qualifier. The random seed is fully determined by these parameters.
    */
-  private static byte[] getValueForRowColumn(String rowKey, String qual,
-      int dataSize) {
-    Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() +
-        dataSize);
+  private static byte[] getValueForRowColumn(int dataSize, byte[]... seedStrings) {
+    long seed = dataSize;
+    for (byte[] str : seedStrings) {
+      seed += Bytes.toString(str).hashCode();
+    }
+    Random seededRandom = new Random(seed);
     byte[] randomBytes = new byte[dataSize];
     seededRandom.nextBytes(randomBytes);
     return randomBytes;

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Fri Mar 29 15:53:50 2013
@@ -118,7 +118,7 @@ public class LoadTestTool extends Abstra
 
   // Writer options
   private int numWriterThreads = DEFAULT_NUM_THREADS;
-  private long minColsPerKey, maxColsPerKey;
+  private int minColsPerKey, maxColsPerKey;
   private int minColDataSize, maxColDataSize;
   private boolean isMultiPut;
 
@@ -259,7 +259,7 @@ public class LoadTestTool extends Abstra
 
       int colIndex = 0;
       minColsPerKey = 1;
-      maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]);
+      maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
       int avgColDataSize =
           parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
       minColDataSize = avgColDataSize / 2;
@@ -341,16 +341,16 @@ public class LoadTestTool extends Abstra
       initTestTable();
     }
 
+    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
+      minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
+
     if (isWrite) {
-      writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);
+      writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
       writerThreads.setMultiPut(isMultiPut);
-      writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
-      writerThreads.setDataSize(minColDataSize, maxColDataSize);
     }
 
     if (isRead) {
-      readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
-          verifyPercent);
+      readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
       readerThreads.setMaxErrors(maxReadErrors);
       readerThreads.setKeyWindow(keyWindow);
     }

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Fri Mar 29 15:53:50 2013
@@ -18,12 +18,16 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -34,7 +38,6 @@ public abstract class MultiThreadedActio
   private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
 
   protected final byte[] tableName;
-  protected final byte[] columnFamily;
   protected final Configuration conf;
 
   protected int numThreads = 1;
@@ -51,8 +54,69 @@ public abstract class MultiThreadedActio
   protected AtomicLong totalOpTimeMs = new AtomicLong();
   protected boolean verbose = false;
 
-  protected int minDataSize = 256;
-  protected int maxDataSize = 1024;
+  protected LoadTestDataGenerator dataGenerator = null;
+
+  /**
+   * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
+   * set of column families, and random number of columns in range. The table for it can
+   * be created manually or, for example, via
+   * {@link HBaseTestingUtility#createPreSplitLoadTestTable(
+   * org.apache.hadoop.hbase.Configuration, byte[], byte[], Algorithm, DataBlockEncoding)}
+   */
+  public static class DefaultDataGenerator extends LoadTestDataGenerator {
+    private byte[][] columnFamilies = null;
+    private int minColumnsPerKey;
+    private int maxColumnsPerKey;
+    private final Random random = new Random();
+
+    public DefaultDataGenerator(int minValueSize, int maxValueSize,
+        int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
+      super(minValueSize, maxValueSize);
+      this.columnFamilies = columnFamilies;
+      this.minColumnsPerKey = minColumnsPerKey;
+      this.maxColumnsPerKey = maxColumnsPerKey;
+    }
+
+    public DefaultDataGenerator(byte[]... columnFamilies) {
+      // Default values for tests that didn't care to provide theirs.
+      this(256, 1024, 1, 10, columnFamilies);
+    }
+
+    @Override
+    public byte[] getDeterministicUniqueKey(long keyBase) {
+      return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
+    }
+
+    @Override
+    public byte[][] getColumnFamilies() {
+      return columnFamilies;
+    }
+
+    @Override
+    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
+      int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
+      byte[][] columns = new byte[numColumns][];
+      for (int i = 0; i < numColumns; ++i) {
+        columns[i] = Integer.toString(i).getBytes();
+      }
+      return columns;
+    }
+
+    @Override
+    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
+      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
+    }
+
+    @Override
+    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
+      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
+    }
+
+    @Override
+    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
+      return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
+    }
+  }
 
   /** "R" or "W" */
   private String actionLetter;
@@ -62,11 +126,11 @@ public abstract class MultiThreadedActio
 
   public static final int REPORTING_INTERVAL_MS = 5000;
 
-  public MultiThreadedAction(Configuration conf, byte[] tableName,
-      byte[] columnFamily, String actionLetter) {
+  public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, byte[] tableName,
+      String actionLetter) {
     this.conf = conf;
     this.tableName = tableName;
-    this.columnFamily = columnFamily;
+    this.dataGenerator = dataGen;
     this.actionLetter = actionLetter;
   }
 
@@ -165,17 +229,16 @@ public abstract class MultiThreadedActio
     }
   }
 
-  public void setDataSize(int minDataSize, int maxDataSize) {
-    this.minDataSize = minDataSize;
-    this.maxDataSize = maxDataSize;
-  }
-
   public void waitForFinish() {
     while (numThreadsWorking.get() != 0) {
       Threads.sleepWithoutInterrupt(1000);
     }
   }
 
+  public boolean isDone() {
+    return (numThreadsWorking.get() == 0);
+  }
+
   protected void startThreads(Collection<? extends Thread> threads) {
     numThreadsWorking.addAndGet(threads.size());
     for (Thread thread : threads) {
@@ -202,4 +265,77 @@ public abstract class MultiThreadedActio
     sb.append(v);
   }
 
+  /**
+   * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
+   * Does not verify cf/column integrity.
+   */
+  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
+    return verifyResultAgainstDataGenerator(result, verifyValues, false);
+  }
+
+  /**
+   * Verifies the result from get or scan using the dataGenerator (that was presumably
+   * also used to generate said result).
+   * @param verifyValues verify that values in the result make sense for row/cf/column combination
+   * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
+   *                                   that to use this multiPut should be used, or verification
+   *                                   has to happen after writes, otherwise there can be races.
+   * @return
+   */
+  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
+      boolean verifyCfAndColumnIntegrity) {
+    String rowKeyStr = Bytes.toString(result.getRow());
+
+    // See if we have any data at all.
+    if (result.isEmpty()) {
+      LOG.error("No data returned for key = [" + rowKeyStr + "]");
+      return false;
+    }
+
+    if (!verifyValues && !verifyCfAndColumnIntegrity) {
+      return true; // as long as we have something, we are good.
+    }
+
+    // See if we have all the CFs.
+    byte[][] expectedCfs = dataGenerator.getColumnFamilies();
+    if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
+      LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size());
+      return false;
+    }
+
+    // Verify each column family from get in the result.
+    for (byte[] cf : result.getMap().keySet()) {
+      String cfStr = Bytes.toString(cf);
+      Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
+      if (columnValues == null) {
+        LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]");
+        return false;
+      }
+      // See if we have correct columns.
+      if (verifyCfAndColumnIntegrity
+          && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
+        String colsStr = "";
+        for (byte[] col : columnValues.keySet()) {
+          if (colsStr.length() > 0) {
+            colsStr += ", ";
+          }
+          colsStr += "[" + Bytes.toString(col) + "]";
+        }
+        LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr);
+        return false;
+      }
+      // See if values check out.
+      if (verifyValues) {
+        for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
+          if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) {
+            LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+              + cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " +
+              + kv.getValue().length);
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
 }

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Fri Mar 29 15:53:50 2013
@@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,14 +26,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 
 /** Creates multiple threads that read and verify previously written data */
-public class MultiThreadedReader extends MultiThreadedAction
-{
+public class MultiThreadedReader extends MultiThreadedAction {
+
   private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
 
   private Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
@@ -71,9 +69,9 @@ public class MultiThreadedReader extends
   private int maxErrors = DEFAULT_MAX_ERRORS;
   private int keyWindow = DEFAULT_KEY_WINDOW;
 
-  public MultiThreadedReader(Configuration conf, byte[] tableName,
-      byte[] columnFamily, double verifyPercent) {
-    super(conf, tableName, columnFamily, "R");
+  public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
+      byte[] tableName, double verifyPercent) {
+    super(dataGen, conf, tableName, "R");
     this.verifyPercent = verifyPercent;
   }
 
@@ -222,14 +220,22 @@ public class MultiThreadedReader extends
     }
 
     private Get readKey(long keyToRead) {
-      Get get = new Get(
-          LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes());
-      get.addFamily(columnFamily);
+      Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
+      String cfsString = "";
+      byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+      for (byte[] cf : columnFamilies) {
+        get.addFamily(cf);
+        if (verbose) {
+          if (cfsString.length() > 0) {
+            cfsString += ", ";
+          }
+          cfsString += "[" + Bytes.toStringBinary(cf) + "]";
+        }
+      }
 
       try {
         if (verbose) {
-          LOG.info("[" + readerId + "] " + "Querying key " + keyToRead
-              + ", cf " + Bytes.toStringBinary(columnFamily));
+          LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
         }
         queryKey(get, random.nextInt(100) < verifyPercent);
       } catch (IOException e) {
@@ -251,45 +257,37 @@ public class MultiThreadedReader extends
       numKeys.addAndGet(1);
 
       // if we got no data report error
-      if (result.isEmpty()) {
-         HRegionLocation hloc = table.getRegionLocation(
-             Bytes.toBytes(rowKey));
+      if (!result.isEmpty()) {
+        if (verify) {
+          numKeysVerified.incrementAndGet();
+        }
+      } else {
+        HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(rowKey));
         LOG.info("Key = " + rowKey + ", RegionServer: "
             + hloc.getHostname());
-        numReadErrors.addAndGet(1);
-        LOG.error("No data returned, tried to get actions for key = "
-            + rowKey + (writer == null ? "" : ", keys inserted by writer: " +
-                writer.numKeys.get() + ")"));
-
-         if (numReadErrors.get() > maxErrors) {
-          LOG.error("Aborting readers -- found more than " + maxErrors
-              + " errors\n");
-           aborted = true;
-         }
       }
 
-      if (result.getFamilyMap(columnFamily) != null) {
-        // increment number of columns read
-        numCols.addAndGet(result.getFamilyMap(columnFamily).size());
-
-        if (verify) {
-          // verify the result
-          List<KeyValue> keyValues = result.list();
-          for (KeyValue kv : keyValues) {
-            String qual = new String(kv.getQualifier());
-
-            // if something does not look right report it
-            if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) {
-              numReadErrors.addAndGet(1);
-              LOG.error("Error checking data for key = " + rowKey
-                  + ", actionId = " + qual);
-            }
-          }
-          numKeysVerified.addAndGet(1);
+      boolean isOk = verifyResultAgainstDataGenerator(result, verify);
+      long numErrorsAfterThis = 0;
+      if (isOk) {
+        long cols = 0;
+        // Count the columns for reporting purposes.
+        for (byte[] cf : result.getMap().keySet()) {
+          cols += result.getFamilyMap(cf).size();
+        }
+        numCols.addAndGet(cols);
+      } else {
+        if (writer != null) {
+          LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys");
         }
+        numErrorsAfterThis = numReadErrors.incrementAndGet();
       }
-    }
 
+      if (numErrorsAfterThis > maxErrors) {
+        LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
+        aborted = true;
+      }
+    }
   }
 
   public long getNumReadFailures() {

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Fri Mar 29 15:53:50 2013
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.PriorityQueue;
 import java.util.Queue;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -38,8 +37,6 @@ import org.apache.hadoop.hbase.client.Pu
 public class MultiThreadedWriter extends MultiThreadedAction {
   private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
 
-  private long minColumnsPerKey = 1;
-  private long maxColumnsPerKey = 10;
   private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
 
   private boolean isMultiPut = false;
@@ -50,8 +47,7 @@ public class MultiThreadedWriter extends
    * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
    * being inserted. This queue is supposed to stay small.
    */
-  private BlockingQueue<Long> insertedKeys =
-      new ArrayBlockingQueue<Long>(10000);
+  private BlockingQueue<Long> insertedKeys = new ArrayBlockingQueue<Long>(10000);
 
   /**
    * This is the current key to be inserted by any thread. Each thread does an
@@ -77,9 +73,9 @@ public class MultiThreadedWriter extends
   /** Enable this if used in conjunction with a concurrent reader. */
   private boolean trackInsertedKeys;
 
-  public MultiThreadedWriter(Configuration conf, byte[] tableName,
-      byte[] columnFamily) {
-    super(conf, tableName, columnFamily, "W");
+  public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
+      byte[] tableName) {
+    super(dataGen, conf, tableName, "W");
   }
 
   /** Use multi-puts vs. separate puts for every column in a row */
@@ -87,11 +83,6 @@ public class MultiThreadedWriter extends
     this.isMultiPut = isMultiPut;
   }
 
-  public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
-    this.minColumnsPerKey = minColumnsPerKey;
-    this.maxColumnsPerKey = maxColumnsPerKey;
-  }
-
   @Override
   public void start(long startKey, long endKey, int numThreads)
       throws IOException {
@@ -117,17 +108,9 @@ public class MultiThreadedWriter extends
     startThreads(writers);
   }
 
-  public static byte[] longToByteArrayKey(long rowKey) {
-    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
-  }
-
   private class HBaseWriterThread extends Thread {
     private final HTable table;
 
-    private final Random random = new Random();
-    private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
-        minDataSize, maxDataSize);
-
     public HBaseWriterThread(int writerId) throws IOException {
       setName(getClass().getSimpleName() + "_" + writerId);
       table = new HTable(conf, tableName);
@@ -135,20 +118,35 @@ public class MultiThreadedWriter extends
 
     public void run() {
       try {
-        long rowKey;
-        while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
-          long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
-              % (maxColumnsPerKey - minColumnsPerKey);
+        long rowKeyBase;
+        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+        while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) {
+          byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
+          Put put = new Put(rowKey);
           numKeys.addAndGet(1);
+          int columnCount = 0;
+          for (byte[] cf : columnFamilies) {
+            byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
+            for (byte[] column : columns) {
+              byte[] value = dataGenerator.generateValue(rowKey, cf, column);
+              put.add(cf, column, value);
+              ++columnCount;
+              if (!isMultiPut) {
+                insert(put, rowKeyBase);
+                numCols.addAndGet(1);
+                put = new Put(rowKey);
+              }
+            }
+          }
           if (isMultiPut) {
-            multiPutInsertKey(rowKey, 0, numColumns);
-          } else {
-            for (long col = 0; col < numColumns; ++col) {
-              insert(rowKey, col);
+            if (verbose) {
+              LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns");
             }
+            insert(put, rowKeyBase);
+            numCols.addAndGet(columnCount);
           }
           if (trackInsertedKeys) {
-            insertedKeys.add(rowKey);
+            insertedKeys.add(rowKeyBase);
           }
         }
       } finally {
@@ -161,52 +159,14 @@ public class MultiThreadedWriter extends
       }
     }
 
-    public void insert(long rowKey, long col) {
-      Put put = new Put(longToByteArrayKey(rowKey));
-      String colAsStr = String.valueOf(col);
-      put.add(columnFamily, Bytes.toBytes(colAsStr),
-          dataGenerator.generateRandomSizeValue(rowKey, colAsStr));
+    public void insert(Put put, long keyBase) {
       try {
         long start = System.currentTimeMillis();
         table.put(put);
-        numCols.addAndGet(1);
         totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
       } catch (IOException e) {
-        failedKeySet.add(rowKey);
-        LOG.error("Failed to insert: " + rowKey);
-        e.printStackTrace();
-      }
-    }
-
-    public void multiPutInsertKey(long rowKey, long startCol, long endCol) {
-      if (verbose) {
-        LOG.debug("Preparing put for key = " + rowKey + ", cols = ["
-            + startCol + ", " + endCol + ")");
-      }
-
-      if (startCol >= endCol) {
-        return;
-      }
-
-      Put put = new Put(LoadTestKVGenerator.md5PrefixedKey(
-          rowKey).getBytes());
-      byte[] columnQualifier;
-      byte[] value;
-      for (long i = startCol; i < endCol; ++i) {
-        String qualStr = String.valueOf(i);
-        columnQualifier = qualStr.getBytes();
-        value = dataGenerator.generateRandomSizeValue(rowKey, qualStr);
-        put.add(columnFamily, columnQualifier, value);
-      }
-
-      try {
-        long start = System.currentTimeMillis();
-        table.put(put);
-        numCols.addAndGet(endCol - startCol);
-        totalOpTimeMs.addAndGet(
-            System.currentTimeMillis() - start);
-      } catch (IOException e) {
-        failedKeySet.add(rowKey);
+        failedKeySet.add(keyBase);
+        LOG.error("Failed to insert: " + keyBase);
         e.printStackTrace();
       }
     }
@@ -301,7 +261,7 @@ public class MultiThreadedWriter extends
    * key, which requires a blocking queue and a consumer thread.
    * @param enable whether to enable tracking the last inserted key
    */
-  void setTrackInsertedKeys(boolean enable) {
+  public void setTrackInsertedKeys(boolean enable) {
     trackInsertedKeys = enable;
   }
 

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java Fri Mar 29 15:53:50 2013
@@ -62,8 +62,8 @@ public class RestartMetaTest extends Abs
   private void loadData() throws IOException {
     long startKey = 0;
     long endKey = 100000;
-    long minColsPerKey = 5;
-    long maxColsPerKey = 15;
+    int minColsPerKey = 5;
+    int maxColsPerKey = 15;
     int minColDataSize = 256;
     int maxColDataSize = 256 * 3;
     int numThreads = 10;
@@ -77,11 +77,10 @@ public class RestartMetaTest extends Abs
     System.out.printf("Client Threads: %d\n", numThreads);
 
     // start the writers
-    MultiThreadedWriter writer = new MultiThreadedWriter(conf, TABLE_NAME,
-        LoadTestTool.COLUMN_FAMILY);
+    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
+      minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY);
+    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
     writer.setMultiPut(true);
-    writer.setColumnsPerKey(minColsPerKey, maxColsPerKey);
-    writer.setDataSize(minColDataSize, maxColDataSize);
     writer.start(startKey, endKey, numThreads);
     System.out.printf("Started loading data...");
     writer.waitForFinish();

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java Fri Mar 29 15:53:50 2013
@@ -39,8 +39,8 @@ public class TestLoadTestKVGenerator {
   @Test
   public void testValueLength() {
     for (int i = 0; i < 1000; ++i) {
-      byte[] v = gen.generateRandomSizeValue(i,
-          String.valueOf(rand.nextInt()));
+      byte[] v = gen.generateRandomSizeValue(Integer.toString(i).getBytes(),
+        String.valueOf(rand.nextInt()).getBytes());
       assertTrue(MIN_LEN <= v.length);
       assertTrue(v.length <= MAX_LEN);
     }
@@ -50,12 +50,12 @@ public class TestLoadTestKVGenerator {
   public void testVerification() {
     for (int i = 0; i < 1000; ++i) {
       for (int qualIndex = 0; qualIndex < 20; ++qualIndex) {
-        String qual = String.valueOf(qualIndex);
-        byte[] v = gen.generateRandomSizeValue(i, qual);
-        String rowKey = LoadTestKVGenerator.md5PrefixedKey(i);
-        assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v));
+        byte[] qual = String.valueOf(qualIndex).getBytes();
+        byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
+        byte[] v = gen.generateRandomSizeValue(rowKey, qual);
+        assertTrue(LoadTestKVGenerator.verify(v, rowKey, qual));
         v[0]++;
-        assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v));
+        assertFalse(LoadTestKVGenerator.verify(v, rowKey, qual));
       }
     }
   }

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java?rev=1462511&r1=1462510&r2=1462511&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java Fri Mar 29 15:53:50 2013
@@ -136,9 +136,10 @@ public class TestMiniClusterLoadSequenti
 
     TEST_UTIL.waitUntilAllRegionsAssigned(numRegions);
 
-    writerThreads = new MultiThreadedWriter(conf, TABLE, CF);
+    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
+    writerThreads = new MultiThreadedWriter(dataGen, conf, TABLE);
     writerThreads.setMultiPut(isMultiPut);
-    readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100);
+    readerThreads = new MultiThreadedReader(dataGen, conf, TABLE, 100);
   }
 
   protected int numKeys() {