You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/15 00:57:49 UTC

svn commit: r1433224 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/ hbase-common/src/test/java/org/apache/hadoop/hbase/util/ hbase-it/src/test/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/...

Author: tedyu
Date: Mon Jan 14 23:57:48 2013
New Revision: 1433224

URL: http://svn.apache.org/viewvc?rev=1433224&view=rev
Log:
HBASE-7383 create integration test for HBASE-5416 (improving scan performance for certain filters) (Sergey)


Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java
Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java?rev=1433224&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java Mon Jan 14 23:57:48 2013
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util.test;
+
+import java.util.Set;
+
+/**
+ * A generator of random data (keys/cfs/columns/values) for load testing.
+ * Contains LoadTestKVGenerator as a matter of convenience...
+ */
+public abstract class LoadTestDataGenerator {
+  protected final LoadTestKVGenerator kvGenerator;
+
+  /**
+   * Initializes the object.
+   * @param minValueSize minimum size of the value generated by
+   * {@link #generateValue(byte[], byte[], byte[])}.
+   * @param maxValueSize maximum size of the value generated by
+   * {@link #generateValue(byte[], byte[], byte[])}.
+   */
+  public LoadTestDataGenerator(int minValueSize, int maxValueSize) {
+    this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize);
+  }
+
+  /**
+   * Generates a deterministic, unique hashed row key from a number. That way, the user can
+   * keep track of numbers, without messing with byte array and ensuring key distribution.
+   * @param keyBase Base number for a key, such as a loop counter.
+   */
+  public abstract byte[] getDeterministicUniqueKey(long keyBase);
+
+  /**
+   * Gets column families for the load test table.
+   * @return The array of byte[]s representing column family names.
+   */
+  public abstract byte[][] getColumnFamilies();
+
+  /**
+   * Generates an applicable set of columns to be used for a particular key and family.
+   * @param rowKey The row key to generate for.
+   * @param cf The column family name to generate for.
+   * @return The array of byte[]s representing column names.
+   */
+  public abstract byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf);
+
+  /**
+   * Generates a value to be used for a particular row/cf/column.
+   * @param rowKey The row key to generate for.
+   * @param cf The column family name to generate for.
+   * @param column The column name to generate for.
+   * @return The value to use.
+   */
+  public abstract byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column);
+
+  /**
+   * Checks that columns for a rowKey and cf are valid if generated via
+   * {@link #generateColumnsForCf(byte[], byte[])}
+   * @param rowKey The row key to verify for.
+   * @param cf The column family name to verify for.
+   * @param columnSet The column set (for example, encountered by read).
+   * @return True iff valid.
+   */
+  public abstract boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet);
+
+  /**
+   * Checks that value for a rowKey/cf/column is valid if generated via
+   * {@link #generateValue(byte[], byte[], byte[])}
+   * @param rowKey The row key to verify for.
+   * @param cf The column family name to verify for.
+   * @param column The column name to verify for.
+   * @param value The value (for example, encountered by read).
+   * @return True iff valid.
+   */
+  public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value);
+}

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java Mon Jan 14 23:57:48 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.hbase.util.test;
 
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.hbase.util.Bytes;
@@ -27,8 +28,6 @@ import org.apache.hadoop.hbase.util.MD5H
  * hash. Values are generated by selecting value size in the configured range
  * and generating a pseudo-random sequence of bytes seeded by key, column
  * qualifier, and value size.
- * <p>
- * Not thread-safe, so a separate instance is needed for every writer thread/
  */
 public class LoadTestKVGenerator {
 
@@ -49,13 +48,13 @@ public class LoadTestKVGenerator {
 
   /**
    * Verifies that the given byte array is the same as what would be generated
-   * for the given row key and qualifier. We are assuming that the value size
-   * is correct, and only verify the actual bytes. However, if the min/max
-   * value sizes are set sufficiently high, an accidental match should be
+   * for the given seed strings (row/cf/column/...). We are assuming that the
+   * value size is correct, and only verify the actual bytes. However, if the
+   * min/max value sizes are set sufficiently high, an accidental match should be
    * extremely improbable.
    */
-  public static boolean verify(String rowKey, String qual, byte[] value) {
-    byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length);
+  public static boolean verify(byte[] value, byte[]... seedStrings) {
+    byte[] expectedData = getValueForRowColumn(value.length, seedStrings);
     return Bytes.equals(expectedData, value);
   }
 
@@ -74,27 +73,31 @@ public class LoadTestKVGenerator {
   /**
    * Generates a value for the given key index and column qualifier. Size is
    * selected randomly in the configured range. The generated value depends
-   * only on the combination of the key, qualifier, and the selected value
-   * size. This allows to verify the actual value bytes when reading, as done
-   * in {@link #verify(String, String, byte[])}.
+   * only on the combination of the strings passed (key/cf/column/...) and the selected
+   * value size. This allows to verify the actual value bytes when reading, as done
+   * in {#verify(byte[], byte[]...)}
+   * This method is as thread-safe as Random class. It appears that the worst bug ever
+   * found with the latter is that multiple threads will get some duplicate values, which
+   * we don't care about.
    */
-  public byte[] generateRandomSizeValue(long key, String qual) {
-    String rowKey = md5PrefixedKey(key);
+  public byte[] generateRandomSizeValue(byte[]... seedStrings) {
     int dataSize = minValueSize;
-    if(minValueSize != maxValueSize){
+    if(minValueSize != maxValueSize) {
       dataSize = minValueSize + randomForValueSize.nextInt(Math.abs(maxValueSize - minValueSize));
     }
-    return getValueForRowColumn(rowKey, qual, dataSize);
+    return getValueForRowColumn(dataSize, seedStrings);
   }
 
   /**
    * Generates random bytes of the given size for the given row and column
    * qualifier. The random seed is fully determined by these parameters.
    */
-  private static byte[] getValueForRowColumn(String rowKey, String qual,
-      int dataSize) {
-    Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() +
-        dataSize);
+  private static byte[] getValueForRowColumn(int dataSize, byte[]... seedStrings) {
+    long seed = dataSize;
+    for (byte[] str : seedStrings) {
+      seed += Bytes.toString(str).hashCode();
+    }
+    Random seededRandom = new Random(seed);
     byte[] randomBytes = new byte[dataSize];
     seededRandom.nextBytes(randomBytes);
     return randomBytes;

Modified: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java (original)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java Mon Jan 14 23:57:48 2013
@@ -41,8 +41,8 @@ public class TestLoadTestKVGenerator {
   @Test
   public void testValueLength() {
     for (int i = 0; i < 1000; ++i) {
-      byte[] v = gen.generateRandomSizeValue(i,
-          String.valueOf(rand.nextInt()));
+      byte[] v = gen.generateRandomSizeValue(Integer.toString(i).getBytes(),
+          String.valueOf(rand.nextInt()).getBytes());
       assertTrue(MIN_LEN <= v.length);
       assertTrue(v.length <= MAX_LEN);
     }
@@ -52,12 +52,12 @@ public class TestLoadTestKVGenerator {
   public void testVerification() {
     for (int i = 0; i < 1000; ++i) {
       for (int qualIndex = 0; qualIndex < 20; ++qualIndex) {
-        String qual = String.valueOf(qualIndex);
-        byte[] v = gen.generateRandomSizeValue(i, qual);
-        String rowKey = LoadTestKVGenerator.md5PrefixedKey(i);
-        assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v));
+        byte[] qual = String.valueOf(qualIndex).getBytes();
+        byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
+        byte[] v = gen.generateRandomSizeValue(rowKey, qual);
+        assertTrue(LoadTestKVGenerator.verify(v, rowKey, qual));
         v[0]++;
-        assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v));
+        assertFalse(LoadTestKVGenerator.verify(v, rowKey, qual));
       }
     }
   }

Added: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java?rev=1433224&view=auto
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java (added)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java Mon Jan 14 23:57:48 2013
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.hbase.util.MultiThreadedWriter;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Integration test that verifies lazy CF loading during scans by doing repeated scans
+ * with this feature while multiple threads are continuously writing values; and
+ * verifying the result.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestLazyCfLoading {
+  private static final String TABLE_NAME = IntegrationTestLazyCfLoading.class.getSimpleName();
+  private static final String TIMEOUT_KEY = "hbase.%s.timeout";
+
+  /** A soft test timeout; duration of the test, as such, depends on number of keys to put. */
+  private static final int DEFAULT_TIMEOUT_MINUTES = 10;
+
+  private static final int NUM_SERVERS = 1;
+  /** Set regions per server low to ensure splits happen during test */
+  private static final int REGIONS_PER_SERVER = 3;
+  private static final int KEYS_TO_WRITE_PER_SERVER = 20000;
+  private static final int WRITER_THREADS = 10;
+  private static final int WAIT_BETWEEN_SCANS_MS = 1000;
+
+  private static final Log LOG = LogFactory.getLog(IntegrationTestLazyCfLoading.class);
+  private IntegrationTestingUtility util = new IntegrationTestingUtility();
+  private final DataGenerator dataGen = new DataGenerator();
+
+  /** Custom LoadTestDataGenerator. Uses key generation and verification from
+   * LoadTestKVGenerator. Creates 3 column families; one with an integer column to
+   * filter on, the 2nd one with an integer column that matches the first integer column (for
+   * test-specific verification), and byte[] value that is used for general verification; and
+   * the third one with just the value.
+   */
+  private static class DataGenerator extends LoadTestDataGenerator {
+    private static final int MIN_DATA_SIZE = 4096;
+    private static final int MAX_DATA_SIZE = 65536;
+    public static final byte[] ESSENTIAL_CF = Bytes.toBytes("essential");
+    public static final byte[] JOINED_CF1 = Bytes.toBytes("joined");
+    public static final byte[] JOINED_CF2 = Bytes.toBytes("joined2");
+    public static final byte[] FILTER_COLUMN = Bytes.toBytes("filter");
+    public static final byte[] VALUE_COLUMN = Bytes.toBytes("val");
+    public static final long ACCEPTED_VALUE = 1L;
+
+    private static final Map<byte[], byte[][]> columnMap = new TreeMap<byte[], byte[][]>(
+        Bytes.BYTES_COMPARATOR);
+
+    private final AtomicLong expectedNumberOfKeys = new AtomicLong(0);
+    private final AtomicLong totalNumberOfKeys = new AtomicLong(0);
+
+    public DataGenerator() {
+      super(MIN_DATA_SIZE, MAX_DATA_SIZE);
+      columnMap.put(ESSENTIAL_CF, new byte[][] { FILTER_COLUMN });
+      columnMap.put(JOINED_CF1, new byte[][] { FILTER_COLUMN, VALUE_COLUMN });
+      columnMap.put(JOINED_CF2, new byte[][] { VALUE_COLUMN });
+    }
+
+    public long getExpectedNumberOfKeys() {
+      return expectedNumberOfKeys.get();
+    }
+
+    public long getTotalNumberOfKeys() {
+      return totalNumberOfKeys.get();
+    }
+
+    @Override
+    public byte[] getDeterministicUniqueKey(long keyBase) {
+      return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
+    }
+
+    @Override
+    public byte[][] getColumnFamilies() {
+      return columnMap.keySet().toArray(new byte[columnMap.size()][]);
+    }
+
+    @Override
+    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
+      return columnMap.get(cf);
+    }
+
+    @Override
+    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
+      if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) {
+        // Random deterministic way to make some values "on" and others "off" for filters.
+        long value = Long.parseLong(Bytes.toString(rowKey, 0, 4), 16) & ACCEPTED_VALUE;
+        if (Bytes.BYTES_COMPARATOR.compare(cf, ESSENTIAL_CF) == 0) {
+          totalNumberOfKeys.incrementAndGet();
+          if (value == ACCEPTED_VALUE) {
+            expectedNumberOfKeys.incrementAndGet();
+          }
+        }
+        return Bytes.toBytes(value);
+      } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) {
+        return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
+      }
+      String error = "Unknown column " + Bytes.toString(column);
+      assert false : error;
+      throw new InvalidParameterException(error);
+    }
+
+    @Override
+    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
+      if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) {
+        // Relies on the filter from getScanFilter being used.
+        return Bytes.toLong(value) == ACCEPTED_VALUE;
+      } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) {
+        return LoadTestKVGenerator.verify(value, rowKey, cf, column);
+      }
+      return false; // some bogus value from read, we don't expect any such thing.
+    }
+
+    @Override
+    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
+      return columnMap.get(cf).length == columnSet.size();
+    }
+
+    public Filter getScanFilter() {
+      SingleColumnValueFilter scf = new SingleColumnValueFilter(ESSENTIAL_CF, FILTER_COLUMN,
+          CompareFilter.CompareOp.EQUAL, Bytes.toBytes(ACCEPTED_VALUE));
+      scf.setFilterIfMissing(true);
+      return scf;
+    }
+  };
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Initializing cluster with " + NUM_SERVERS + " servers");
+    util.initializeCluster(NUM_SERVERS);
+    LOG.info("Done initializing cluster");
+    createTable();
+  }
+
+  private void createTable() throws Exception {
+    deleteTable();
+    LOG.info("Creating table");
+    HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes(TABLE_NAME));
+    for (byte[] cf : dataGen.getColumnFamilies()) {
+      htd.addFamily(new HColumnDescriptor(cf));
+    }
+    int serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
+    byte[][] splits = new RegionSplitter.HexStringSplit().split(serverCount * REGIONS_PER_SERVER);
+    util.getHBaseAdmin().createTable(htd, splits);
+    LOG.info("Created table");
+  }
+
+  private void deleteTable() throws Exception {
+    if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
+      LOG.info("Deleting table");
+      if (!util.getHBaseAdmin().isTableDisabled(TABLE_NAME)) {
+        util.getHBaseAdmin().disableTable(TABLE_NAME);
+      }
+      util.getHBaseAdmin().deleteTable(TABLE_NAME);
+      LOG.info("Deleted table");
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    deleteTable();
+    LOG.info("Restoring the cluster");
+    util.restoreCluster();
+    LOG.info("Done restoring the cluster");
+  }
+
+  @Test
+  public void testReadersAndWriters() throws Exception {
+    Configuration conf = util.getConfiguration();
+    String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
+    long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
+    long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
+    long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
+    HTable table = new HTable(conf, Bytes.toBytes(TABLE_NAME));
+
+    // Create multi-threaded writer and start it. We write multiple columns/CFs and verify
+    // their integrity, therefore multi-put is necessary.
+    MultiThreadedWriter writer =
+      new MultiThreadedWriter(dataGen, conf, Bytes.toBytes(TABLE_NAME));
+    writer.setMultiPut(true);
+
+    LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
+    writer.start(1, keysToWrite, WRITER_THREADS);
+
+    // Now, do scans.
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    long timeLimit = now + (maxRuntime * 60000);
+    boolean isWriterDone = false;
+    while (now < timeLimit && !isWriterDone) {
+      LOG.info("Starting the scan; wrote approximately "
+        + dataGen.getTotalNumberOfKeys() + " keys");
+      isWriterDone = writer.isDone();
+      if (isWriterDone) {
+        LOG.info("Scanning full result, writer is done");
+      }
+      Scan scan = new Scan();
+      for (byte[] cf : dataGen.getColumnFamilies()) {
+        scan.addFamily(cf);
+      }
+      scan.setFilter(dataGen.getScanFilter());
+      scan.setLoadColumnFamiliesOnDemand(true);
+      // The number of keys we can expect from scan - lower bound (before scan).
+      // Not a strict lower bound - writer knows nothing about filters, so we report
+      // this from generator. Writer might have generated the value but not put it yet.
+      long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
+      long startTs = EnvironmentEdgeManager.currentTimeMillis();
+      ResultScanner results = table.getScanner(scan);
+      long resultCount = 0;
+      Result result = null;
+      // Verify and count the results.
+      while ((result = results.next()) != null) {
+        boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
+        Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
+        ++resultCount;
+      }
+      long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs;
+      // Verify the result count.
+      long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
+      Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
+        + " were generated ", onesGennedAfterScan >= resultCount);
+      if (isWriterDone) {
+        Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
+          + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
+      } else if (onesGennedBeforeScan * 0.9 > resultCount) {
+        LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
+          + ") - there might be a problem, or the writer might just be slow");
+      }
+      LOG.info("Scan took " + timeTaken + "ms");
+      if (!isWriterDone) {
+        Thread.sleep(WAIT_BETWEEN_SCANS_MS);
+        now = EnvironmentEdgeManager.currentTimeMillis();
+      }
+    }
+    Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
+    Assert.assertTrue("Writer is not done", isWriterDone);
+    // Assert.fail("Boom!");
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Jan 14 23:57:48 2013
@@ -3641,7 +3641,7 @@ public class HRegion implements HeapSize
           // First, check if we are at a stop row. If so, there are no more results.
           if (stopRow) {
             if (filter != null && filter.hasFilterRow()) {
-              filter.filterRow(results);        
+              filter.filterRow(results);
             }
             return false;
           }
@@ -3670,7 +3670,7 @@ public class HRegion implements HeapSize
           final boolean isEmptyRow = results.isEmpty();
 
           // We have the part of the row necessary for filtering (all of it, usually).
-          // First filter with the filterRow(List).            
+          // First filter with the filterRow(List).
           if (filter != null && filter.hasFilterRow()) {
             filter.filterRow(results);
           }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Mon Jan 14 23:57:48 2013
@@ -129,13 +129,12 @@ public class TestEncodedSeekers {
   private void doPuts(HRegion region) throws IOException{
     LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(MIN_VALUE_SIZE, MAX_VALUE_SIZE);
      for (int i = 0; i < NUM_ROWS; ++i) {
-      byte[] key = MultiThreadedWriter.longToByteArrayKey(i);
+      byte[] key = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
         Put put = new Put(key);
-        String colAsStr = String.valueOf(j);
-        byte[] col = Bytes.toBytes(colAsStr);
-        byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr);
-        put.add(CF_BYTES, Bytes.toBytes(colAsStr), value);
+        byte[] col = Bytes.toBytes(String.valueOf(j));
+        byte[] value = dataGenerator.generateRandomSizeValue(key, col);
+        put.add(CF_BYTES, col, value);
         if(VERBOSE){
           KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value);
           System.err.println(Strings.padFront(i+"", ' ', 4)+" "+kvPut);
@@ -151,7 +150,7 @@ public class TestEncodedSeekers {
   
   private void doGets(HRegion region) throws IOException{
     for (int i = 0; i < NUM_ROWS; ++i) {
-      final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i);
+      final byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
         final String qualStr = String.valueOf(j);
         if (VERBOSE) {
@@ -163,8 +162,8 @@ public class TestEncodedSeekers {
         get.addColumn(CF_BYTES, qualBytes);
         Result result = region.get(get, null);
         assertEquals(1, result.size());
-        assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr,
-            result.getValue(CF_BYTES, qualBytes)));
+        byte[] value = result.getValue(CF_BYTES, qualBytes);
+        assertTrue(LoadTestKVGenerator.verify(value, rowKey, qualBytes));
       }
     }
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Mon Jan 14 23:57:48 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.HB
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
 
 /**
  * A command-line utility that reads, writes, and verifies data. Unlike
@@ -119,7 +121,7 @@ public class LoadTestTool extends Abstra
 
   // Writer options
   private int numWriterThreads = DEFAULT_NUM_THREADS;
-  private long minColsPerKey, maxColsPerKey;
+  private int minColsPerKey, maxColsPerKey;
   private int minColDataSize, maxColDataSize;
   private boolean isMultiPut;
 
@@ -260,7 +262,7 @@ public class LoadTestTool extends Abstra
 
       int colIndex = 0;
       minColsPerKey = 1;
-      maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]);
+      maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
       int avgColDataSize =
           parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
       minColDataSize = avgColDataSize / 2;
@@ -342,16 +344,16 @@ public class LoadTestTool extends Abstra
       initTestTable();
     }
 
+    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
+        minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
+
     if (isWrite) {
-      writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);
+      writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
       writerThreads.setMultiPut(isMultiPut);
-      writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
-      writerThreads.setDataSize(minColDataSize, maxColDataSize);
     }
 
     if (isRead) {
-      readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
-          verifyPercent);
+      readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
       readerThreads.setMaxErrors(maxReadErrors);
       readerThreads.setKeyWindow(keyWindow);
     }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Mon Jan 14 23:57:48 2013
@@ -18,12 +18,19 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -34,7 +41,6 @@ public abstract class MultiThreadedActio
   private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
 
   protected final byte[] tableName;
-  protected final byte[] columnFamily;
   protected final Configuration conf;
 
   protected int numThreads = 1;
@@ -51,8 +57,69 @@ public abstract class MultiThreadedActio
   protected AtomicLong totalOpTimeMs = new AtomicLong();
   protected boolean verbose = false;
 
-  protected int minDataSize = 256;
-  protected int maxDataSize = 1024;
+  protected LoadTestDataGenerator dataGenerator = null;
+
+  /**
+   * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
+   * set of column families, and random number of columns in range. The table for it can
+   * be created manually or, for example, via
+   * {@link HBaseTestingUtility#createPreSplitLoadTestTable(
+   * org.apache.hadoop.hbase.Configuration, byte[], byte[], Algorithm, DataBlockEncoding)}
+   */
+  public static class DefaultDataGenerator extends LoadTestDataGenerator {
+    private byte[][] columnFamilies = null;
+    private int minColumnsPerKey;
+    private int maxColumnsPerKey;
+    private final Random random = new Random();
+
+    public DefaultDataGenerator(int minValueSize, int maxValueSize,
+        int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
+      super(minValueSize, maxValueSize);
+      this.columnFamilies = columnFamilies;
+      this.minColumnsPerKey = minColumnsPerKey;
+      this.maxColumnsPerKey = maxColumnsPerKey;
+    }
+
+    public DefaultDataGenerator(byte[]... columnFamilies) {
+      // Default values for tests that didn't care to provide theirs.
+      this(256, 1024, 1, 10, columnFamilies);
+    }
+
+    @Override
+    public byte[] getDeterministicUniqueKey(long keyBase) {
+      return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
+    }
+
+    @Override
+    public byte[][] getColumnFamilies() {
+      return columnFamilies;
+    }
+
+    @Override
+    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
+      int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
+      byte[][] columns = new byte[numColumns][];
+      for (int i = 0; i < numColumns; ++i) {
+        columns[i] = Integer.toString(i).getBytes();
+      }
+      return columns;
+    }
+
+    @Override
+    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
+      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
+    }
+
+    @Override
+    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
+      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
+    }
+
+    @Override
+    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
+      return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
+    }
+  }
 
   /** "R" or "W" */
   private String actionLetter;
@@ -62,11 +129,11 @@ public abstract class MultiThreadedActio
 
   public static final int REPORTING_INTERVAL_MS = 5000;
 
-  public MultiThreadedAction(Configuration conf, byte[] tableName,
-      byte[] columnFamily, String actionLetter) {
+  public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, byte[] tableName,
+      String actionLetter) {
     this.conf = conf;
+    this.dataGenerator = dataGen;
     this.tableName = tableName;
-    this.columnFamily = columnFamily;
     this.actionLetter = actionLetter;
   }
 
@@ -165,17 +232,16 @@ public abstract class MultiThreadedActio
     }
   }
 
-  public void setDataSize(int minDataSize, int maxDataSize) {
-    this.minDataSize = minDataSize;
-    this.maxDataSize = maxDataSize;
-  }
-
   public void waitForFinish() {
     while (numThreadsWorking.get() != 0) {
       Threads.sleepWithoutInterrupt(1000);
     }
   }
 
+  public boolean isDone() {
+    return (numThreadsWorking.get() == 0);
+  }
+
   protected void startThreads(Collection<? extends Thread> threads) {
     numThreadsWorking.addAndGet(threads.size());
     for (Thread thread : threads) {
@@ -202,4 +268,77 @@ public abstract class MultiThreadedActio
     sb.append(v);
   }
 
+  /**
+   * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
+   * Does not verify cf/column integrity.
+   */
+  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
+    return verifyResultAgainstDataGenerator(result, verifyValues, false);
+  }
+
+  /**
+   * Verifies the result from get or scan using the dataGenerator (that was presumably
+   * also used to generate said result).
+   * @param verifyValues verify that values in the result make sense for row/cf/column combination
+   * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
+   *                                   that to use this multiPut should be used, or verification
+   *                                   has to happen after writes, otherwise there can be races.
+   * @return
+   */
+  public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
+      boolean verifyCfAndColumnIntegrity) {
+    String rowKeyStr = Bytes.toString(result.getRow());
+
+    // See if we have any data at all.
+    if (result.isEmpty()) {
+      LOG.error("No data returned for key = [" + rowKeyStr + "]");
+      return false;
+    }
+
+    if (!verifyValues && !verifyCfAndColumnIntegrity) {
+      return true; // as long as we have something, we are good.
+    }
+
+    // See if we have all the CFs.
+    byte[][] expectedCfs = dataGenerator.getColumnFamilies();
+    if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
+      LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size());
+      return false;
+    }
+
+    // Verify each column family from get in the result.
+    for (byte[] cf : result.getMap().keySet()) {
+      String cfStr = Bytes.toString(cf);
+      Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
+      if (columnValues == null) {
+        LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]");
+        return false;
+      }
+      // See if we have correct columns.
+      if (verifyCfAndColumnIntegrity
+          && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
+        String colsStr = "";
+        for (byte[] col : columnValues.keySet()) {
+          if (colsStr.length() > 0) {
+            colsStr += ", ";
+          }
+          colsStr += "[" + Bytes.toString(col) + "]";
+        }
+        LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr);
+        return false;
+      }
+      // See if values check out.
+      if (verifyValues) {
+        for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
+          if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) {
+            LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+              + cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " +
+              + kv.getValue().length);
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Mon Jan 14 23:57:48 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
 import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
 
 /** Creates multiple threads that read and verify previously written data */
@@ -72,9 +74,9 @@ public class MultiThreadedReader extends
   private int maxErrors = DEFAULT_MAX_ERRORS;
   private int keyWindow = DEFAULT_KEY_WINDOW;
 
-  public MultiThreadedReader(Configuration conf, byte[] tableName,
-      byte[] columnFamily, double verifyPercent) {
-    super(conf, tableName, columnFamily, "R");
+  public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
+      byte[] tableName, double verifyPercent) {
+    super(dataGen, conf, tableName, "R");
     this.verifyPercent = verifyPercent;
   }
 
@@ -223,14 +225,22 @@ public class MultiThreadedReader extends
     }
 
     private Get readKey(long keyToRead) {
-      Get get = new Get(
-          LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes());
-      get.addFamily(columnFamily);
+      Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
+      String cfsString = "";
+      byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+      for (byte[] cf : columnFamilies) {
+        get.addFamily(cf);
+        if (verbose) {
+          if (cfsString.length() > 0) {
+            cfsString += ", ";
+          }
+          cfsString += "[" + Bytes.toStringBinary(cf) + "]";
+        }
+      }
 
       try {
         if (verbose) {
-          LOG.info("[" + readerId + "] " + "Querying key " + keyToRead
-              + ", cf " + Bytes.toStringBinary(columnFamily));
+          LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
         }
         queryKey(get, random.nextInt(100) < verifyPercent);
       } catch (IOException e) {
@@ -250,47 +260,38 @@ public class MultiThreadedReader extends
       Result result = table.get(get);
       totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
       numKeys.addAndGet(1);
-
-      // if we got no data report error
-      if (result.isEmpty()) {
+      if (!result.isEmpty()) {
+        if (verify) {
+          numKeysVerified.incrementAndGet();
+        }
+      } else {
          HRegionLocation hloc = table.getRegionLocation(
              Bytes.toBytes(rowKey));
         LOG.info("Key = " + rowKey + ", RegionServer: "
             + hloc.getHostname());
-        numReadErrors.addAndGet(1);
-        LOG.error("No data returned, tried to get actions for key = "
-            + rowKey + (writer == null ? "" : ", keys inserted by writer: " +
-                writer.numKeys.get() + ")"));
-
-         if (numReadErrors.get() > maxErrors) {
-          LOG.error("Aborting readers -- found more than " + maxErrors
-              + " errors\n");
-           aborted = true;
-         }
       }
 
-      if (result.getFamilyMap(columnFamily) != null) {
-        // increment number of columns read
-        numCols.addAndGet(result.getFamilyMap(columnFamily).size());
-
-        if (verify) {
-          // verify the result
-          List<KeyValue> keyValues = result.list();
-          for (KeyValue kv : keyValues) {
-            String qual = new String(kv.getQualifier());
-
-            // if something does not look right report it
-            if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) {
-              numReadErrors.addAndGet(1);
-              LOG.error("Error checking data for key = " + rowKey
-                  + ", actionId = " + qual);
-            }
-          }
-          numKeysVerified.addAndGet(1);
+      boolean isOk = verifyResultAgainstDataGenerator(result, verify);
+      long numErrorsAfterThis = 0;
+      if (isOk) {
+        long cols = 0;
+        // Count the columns for reporting purposes.
+        for (byte[] cf : result.getMap().keySet()) {
+          cols += result.getFamilyMap(cf).size();
+        }
+        numCols.addAndGet(cols);
+      } else {
+        if (writer != null) {
+          LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys");
         }
+        numErrorsAfterThis = numReadErrors.incrementAndGet();
       }
-    }
 
+      if (numErrorsAfterThis > maxErrors) {
+        LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
+        aborted = true;
+      }
+    }
   }
 
   public long getNumReadFailures() {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Mon Jan 14 23:57:48 2013
@@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -33,14 +33,13 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
 
 /** Creates multiple threads that write key/values into the */
 public class MultiThreadedWriter extends MultiThreadedAction {
   private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
 
-  private long minColumnsPerKey = 1;
-  private long maxColumnsPerKey = 10;
   private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
 
   private boolean isMultiPut = false;
@@ -51,8 +50,7 @@ public class MultiThreadedWriter extends
    * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
    * being inserted. This queue is supposed to stay small.
    */
-  private BlockingQueue<Long> insertedKeys =
-      new ArrayBlockingQueue<Long>(10000);
+  private BlockingQueue<Long> insertedKeys = new ArrayBlockingQueue<Long>(10000);
 
   /**
    * This is the current key to be inserted by any thread. Each thread does an
@@ -78,9 +76,9 @@ public class MultiThreadedWriter extends
   /** Enable this if used in conjunction with a concurrent reader. */
   private boolean trackInsertedKeys;
 
-  public MultiThreadedWriter(Configuration conf, byte[] tableName,
-      byte[] columnFamily) {
-    super(conf, tableName, columnFamily, "W");
+  public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
+    byte[] tableName) {
+    super(dataGen, conf, tableName, "W");
   }
 
   /** Use multi-puts vs. separate puts for every column in a row */
@@ -88,11 +86,6 @@ public class MultiThreadedWriter extends
     this.isMultiPut = isMultiPut;
   }
 
-  public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
-    this.minColumnsPerKey = minColumnsPerKey;
-    this.maxColumnsPerKey = maxColumnsPerKey;
-  }
-
   @Override
   public void start(long startKey, long endKey, int numThreads)
       throws IOException {
@@ -118,17 +111,9 @@ public class MultiThreadedWriter extends
     startThreads(writers);
   }
 
-  public static byte[] longToByteArrayKey(long rowKey) {
-    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
-  }
-
   private class HBaseWriterThread extends Thread {
     private final HTable table;
 
-    private final Random random = new Random();
-    private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
-        minDataSize, maxDataSize);
-
     public HBaseWriterThread(int writerId) throws IOException {
       setName(getClass().getSimpleName() + "_" + writerId);
       table = new HTable(conf, tableName);
@@ -136,20 +121,36 @@ public class MultiThreadedWriter extends
 
     public void run() {
       try {
-        long rowKey;
-        while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
-          long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
-              % (maxColumnsPerKey - minColumnsPerKey);
+        long rowKeyBase;
+        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
+        while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) {
+          byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
+          Put put = new Put(rowKey);
           numKeys.addAndGet(1);
+          int columnCount = 0;
+          for (byte[] cf : columnFamilies) {
+            String s;
+            byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
+            for (byte[] column : columns) {
+              byte[] value = dataGenerator.generateValue(rowKey, cf, column);
+              put.add(cf, column, value);
+              ++columnCount;
+              if (!isMultiPut) {
+                insert(put, rowKeyBase);
+                numCols.addAndGet(1);
+                put = new Put(rowKey);
+              }
+            }
+          }
           if (isMultiPut) {
-            multiPutInsertKey(rowKey, 0, numColumns);
-          } else {
-            for (long col = 0; col < numColumns; ++col) {
-              insert(rowKey, col);
+            if (verbose) {
+              LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns");
             }
+            insert(put, rowKeyBase);
+            numCols.addAndGet(columnCount);
           }
           if (trackInsertedKeys) {
-            insertedKeys.add(rowKey);
+            insertedKeys.add(rowKeyBase);
           }
         }
       } finally {
@@ -162,52 +163,14 @@ public class MultiThreadedWriter extends
       }
     }
 
-    public void insert(long rowKey, long col) {
-      Put put = new Put(longToByteArrayKey(rowKey));
-      String colAsStr = String.valueOf(col);
-      put.add(columnFamily, Bytes.toBytes(colAsStr),
-          dataGenerator.generateRandomSizeValue(rowKey, colAsStr));
+    public void insert(Put put, long keyBase) {
       try {
         long start = System.currentTimeMillis();
         table.put(put);
-        numCols.addAndGet(1);
         totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
       } catch (IOException e) {
-        failedKeySet.add(rowKey);
-        LOG.error("Failed to insert: " + rowKey);
-        e.printStackTrace();
-      }
-    }
-
-    public void multiPutInsertKey(long rowKey, long startCol, long endCol) {
-      if (verbose) {
-        LOG.debug("Preparing put for key = " + rowKey + ", cols = ["
-            + startCol + ", " + endCol + ")");
-      }
-
-      if (startCol >= endCol) {
-        return;
-      }
-
-      Put put = new Put(LoadTestKVGenerator.md5PrefixedKey(
-          rowKey).getBytes());
-      byte[] columnQualifier;
-      byte[] value;
-      for (long i = startCol; i < endCol; ++i) {
-        String qualStr = String.valueOf(i);
-        columnQualifier = qualStr.getBytes();
-        value = dataGenerator.generateRandomSizeValue(rowKey, qualStr);
-        put.add(columnFamily, columnQualifier, value);
-      }
-
-      try {
-        long start = System.currentTimeMillis();
-        table.put(put);
-        numCols.addAndGet(endCol - startCol);
-        totalOpTimeMs.addAndGet(
-            System.currentTimeMillis() - start);
-      } catch (IOException e) {
-        failedKeySet.add(rowKey);
+        failedKeySet.add(keyBase);
+        LOG.error("Failed to insert: " + keyBase);
         e.printStackTrace();
       }
     }
@@ -302,8 +265,7 @@ public class MultiThreadedWriter extends
    * key, which requires a blocking queue and a consumer thread.
    * @param enable whether to enable tracking the last inserted key
    */
-  void setTrackInsertedKeys(boolean enable) {
+  public void setTrackInsertedKeys(boolean enable) {
     trackInsertedKeys = enable;
   }
-
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java Mon Jan 14 23:57:48 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
 
 /**
  * A command-line tool that spins up a local process-based cluster, loads
@@ -59,8 +60,8 @@ public class RestartMetaTest extends Abs
   private void loadData() throws IOException {
     long startKey = 0;
     long endKey = 100000;
-    long minColsPerKey = 5;
-    long maxColsPerKey = 15;
+    int minColsPerKey = 5;
+    int maxColsPerKey = 15;
     int minColDataSize = 256;
     int maxColDataSize = 256 * 3;
     int numThreads = 10;
@@ -74,11 +75,10 @@ public class RestartMetaTest extends Abs
     System.out.printf("Client Threads: %d\n", numThreads);
 
     // start the writers
-    MultiThreadedWriter writer = new MultiThreadedWriter(conf, TABLE_NAME,
-        LoadTestTool.COLUMN_FAMILY);
+    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
+      minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY);
+    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
     writer.setMultiPut(true);
-    writer.setColumnsPerKey(minColsPerKey, maxColsPerKey);
-    writer.setDataSize(minColDataSize, maxColDataSize);
     writer.start(startKey, endKey, numThreads);
     System.out.printf("Started loading data...");
     writer.waitForFinish();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java?rev=1433224&r1=1433223&r2=1433224&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java Mon Jan 14 23:57:48 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.TableNotF
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -139,9 +140,10 @@ public class TestMiniClusterLoadSequenti
 
     TEST_UTIL.waitUntilAllRegionsAssigned(numRegions);
 
-    writerThreads = new MultiThreadedWriter(conf, TABLE, CF);
+    LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
+    writerThreads = new MultiThreadedWriter(dataGen, conf, TABLE);
     writerThreads.setMultiPut(isMultiPut);
-    readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100);
+    readerThreads = new MultiThreadedReader(dataGen, conf, TABLE, 100);
   }
 
   protected int numKeys() {