You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2014/03/14 00:22:33 UTC

svn commit: r1577372 - in /hbase/branches/hbase-10070: hbase-it/src/test/java/org/apache/hadoop/hbase/test/ hbase-server/src/test/java/org/apache/hadoop/hbase/util/

Author: ddas
Date: Thu Mar 13 23:22:33 2014
New Revision: 1577372

URL: http://svn.apache.org/r1577372
Log:
HBASE-10616. Integration test for multi-get calls

Added:
    hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java
Modified:
    hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java

Added: hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java?rev=1577372&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java (added)
+++ hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.java Thu Mar 13 23:22:33 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.test;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.IntegrationTests;
+import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.hbase.util.MultiThreadedReader;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Extends {@link IntegrationTestTimeBoundedRequestsWithRegionReplicas} for multi-gets
+ * Besides the options already talked about in IntegrationTestTimeBoundedRequestsWithRegionReplicas
+ * the addition options here are:
+ * <pre>
+ * -DIntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.multiget_batchsize=100
+ * -DIntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas.num_regions_per_server=5
+ * </pre>
+ * The multiget_batchsize when set to 1 will issue normal GETs.
+ * The num_regions_per_server argument indirectly impacts the region size (for a given number of
+ * num_keys_per_server). That in conjunction with multiget_batchsize would have different behaviors
+ * - the batch of gets goes to the same region or to multiple regions.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas
+    extends IntegrationTestTimeBoundedRequestsWithRegionReplicas {
+
+  @Override
+  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
+      long numKeys) {
+    List<String> args = Lists.newArrayList(super.getArgsForLoadTestTool(
+      mode, modeSpecificArg, startKey, numKeys));
+    String clazz = this.getClass().getSimpleName();
+    args.add("-" + LoadTestTool.OPT_MULTIGET);
+    args.add(conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_MULTIGET),
+        Integer.toString(MultiThreadedReader.DEFAULT_BATCH_SIZE)));
+
+    args.add("-" + LoadTestTool.OPT_NUM_REGIONS_PER_SERVER);
+    args.add(conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_NUM_REGIONS_PER_SERVER),
+        Integer.toString(LoadTestTool.DEFAULT_NUM_REGIONS_PER_SERVER)));
+
+    return args.toArray(new String[args.size()]);
+  }
+
+  public static void main(String args[]) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    IntegrationTestingUtility.setUseDistributedCluster(conf);
+    int ret = ToolRunner.run(conf,
+        new IntegrationTestTimeBoundedMultiGetRequestsWithRegionReplicas(), args);
+    System.exit(ret);
+  }
+}

Modified: hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java?rev=1577372&r1=1577371&r2=1577372&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java (original)
+++ hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java Thu Mar 13 23:22:33 2014
@@ -323,9 +323,10 @@ public class IntegrationTestTimeBoundedR
       }
 
       @Override
-      protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano,
-          Result result, HTable table, boolean isNullExpected) throws IOException {
-        super.verifyResultsAndUpdateMetrics(verify, rowKey, elapsedNano, result, table, isNullExpected);
+      protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
+          Result[] results, HTable table, boolean isNullExpected)
+          throws IOException {
+        super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected);
         // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC
         // to complete, but if the request took longer than timeout, we treat that as error.
         if (elapsedNano > timeoutNano) {

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1577372&r1=1577371&r2=1577372&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Thu Mar 13 23:22:33 2014
@@ -127,6 +127,7 @@ public class LoadTestTool extends Abstra
   protected static final String OPT_WRITE = "write";
   protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
   protected static final String OPT_MULTIPUT = "multiput";
+  public static final String OPT_MULTIGET = "multiget_batchsize";
   protected static final String OPT_NUM_KEYS = "num_keys";
   protected static final String OPT_READ = "read";
   protected static final String OPT_START_KEY = "start_key";
@@ -147,7 +148,7 @@ public class LoadTestTool extends Abstra
   public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
   protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
     = "Desired number of regions per region server. Defaults to 5.";
-  protected static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
+  public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
 
   public static final String OPT_REGION_REPLICATION = "region_replication";
   protected static final String OPT_REGION_REPLICATION_USAGE =
@@ -188,6 +189,7 @@ public class LoadTestTool extends Abstra
   // Reader options
   private int numReaderThreads = DEFAULT_NUM_THREADS;
   private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
+  private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
   private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
   private int verifyPercent;
 
@@ -282,6 +284,8 @@ public class LoadTestTool extends Abstra
     addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
         "to tolerate before terminating all reader threads. The default is " +
         MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
+    addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
+        "separate gets for every column in a row");
     addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
         "reads and writes for concurrent write/read workload. The default " +
         "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
@@ -410,6 +414,12 @@ public class LoadTestTool extends Abstra
             0, Integer.MAX_VALUE);
       }
 
+      if (cmd.hasOption(OPT_MULTIGET)) {
+        multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
+            0, Integer.MAX_VALUE);
+      }
+
+      System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
       System.out.println("Percent of keys to verify: " + verifyPercent);
       System.out.println("Reader threads: " + numReaderThreads);
     }
@@ -552,6 +562,7 @@ public class LoadTestTool extends Abstra
       readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
       readerThreads.setMaxErrors(maxReadErrors);
       readerThreads.setKeyWindow(keyWindow);
+      readerThreads.setMultiGetBatchSize(multiGetBatchSize);
     }
 
     if (isUpdate && isWrite) {

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1577372&r1=1577371&r2=1577372&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Thu Mar 13 23:22:33 2014
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -64,13 +65,18 @@ public class MultiThreadedReader extends
    */
   public static final int DEFAULT_KEY_WINDOW = 0;
 
+  /**
+   * Default batch size for multigets
+   */
+  public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET)
+
   protected AtomicLong numKeysVerified = new AtomicLong(0);
   protected AtomicLong numReadErrors = new AtomicLong(0);
   protected AtomicLong numReadFailures = new AtomicLong(0);
   protected AtomicLong nullResult = new AtomicLong(0);
-
   private int maxErrors = DEFAULT_MAX_ERRORS;
   private int keyWindow = DEFAULT_KEY_WINDOW;
+  private int batchSize = DEFAULT_BATCH_SIZE;
 
   public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
       TableName tableName, double verifyPercent) {
@@ -91,6 +97,10 @@ public class MultiThreadedReader extends
     this.keyWindow = keyWindow;
   }
 
+  public void setMultiGetBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
   @Override
   public void start(long startKey, long endKey, int numThreads) throws IOException {
     super.start(startKey, endKey, numThreads);
@@ -169,28 +179,38 @@ public class MultiThreadedReader extends
 
       startTimeMs = System.currentTimeMillis();
       curKey = startKey;
+      long [] keysForThisReader = new long[batchSize];
+      int readingRandomKeyStartIndex = -1;
       while (curKey < endKey && !aborted) {
-        long k = getNextKeyToRead();
-
-        // A sanity check for the key range.
-        if (k < startKey || k >= endKey) {
-          numReadErrors.incrementAndGet();
-          throw new AssertionError("Load tester logic error: proposed key " +
-              "to read " + k + " is out of range (startKey=" + startKey +
-              ", endKey=" + endKey + ")");
-        }
-
-        if (k % numThreads != readerId ||
-            writer != null && writer.failedToWriteKey(k)) {
-          // Skip keys that this thread should not read, as well as the keys
-          // that we know the writer failed to write.
-          continue;
-        }
+        int numKeys = 0;
+        // if multiGet, loop until we have the number of keys equal to the batch size
+        do {
+          long k = getNextKeyToRead();
+          if (k < startKey || k >= endKey) {
+            numReadErrors.incrementAndGet();
+            throw new AssertionError("Load tester logic error: proposed key " +
+                "to read " + k + " is out of range (startKey=" + startKey +
+                ", endKey=" + endKey + ")");
+          }
+          if (k % numThreads != readerId ||
+              writer != null && writer.failedToWriteKey(k)) {
+            // Skip keys that this thread should not read, as well as the keys
+            // that we know the writer failed to write.
+            continue;
+          }
+          keysForThisReader[numKeys] = k;
+          if (readingRandomKey && readingRandomKeyStartIndex == -1) {
+            //store the first index of a random read
+            readingRandomKeyStartIndex = numKeys;
+          }
+          numKeys++;
+        } while (numKeys < batchSize);
 
-        readKey(k);
-        if (k == curKey - 1 && !readingRandomKey) {
-          // We have verified another unique key.
-          numUniqueKeysVerified.incrementAndGet();
+        if (numKeys > 1) { //meaning there is some key to read
+          readKey(keysForThisReader);
+          // We have verified some unique key(s).
+          numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ?
+              numKeys : readingRandomKeyStartIndex);
         }
       }
     }
@@ -240,22 +260,44 @@ public class MultiThreadedReader extends
           % (maxKeyToRead - startKey + 1);
     }
 
-    private Get readKey(long keyToRead) {
-      Get get = null;
-      try {
-        get = createGet(keyToRead);
-        queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead);
-      } catch (IOException e) {
-        numReadFailures.addAndGet(1);
-        LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
-            + ", time from start: "
-            + (System.currentTimeMillis() - startTimeMs) + " ms");
-        if (printExceptionTrace) {
-          LOG.warn(e);
-          printExceptionTrace = false;
+    private Get[] readKey(long[] keysToRead) {
+      Get [] gets = new Get[keysToRead.length];
+      int i = 0;
+      for (long keyToRead : keysToRead) {
+        try {
+          gets[i] = createGet(keyToRead);
+          if (keysToRead.length == 1) {
+            queryKey(gets[i], RandomUtils.nextInt(100) < verifyPercent, keyToRead);
+          }
+          i++;
+        } catch (IOException e) {
+          numReadFailures.addAndGet(1);
+          LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
+              + ", time from start: "
+              + (System.currentTimeMillis() - startTimeMs) + " ms");
+          if (printExceptionTrace) {
+            LOG.warn(e);
+            printExceptionTrace = false;
+          }
         }
       }
-      return get;
+      if (keysToRead.length > 1) {
+        try {
+          queryKey(gets, RandomUtils.nextInt(100) < verifyPercent, keysToRead);
+        } catch (IOException e) {
+          numReadFailures.addAndGet(gets.length);
+          for (long keyToRead : keysToRead) {
+            LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
+                + ", time from start: "
+                + (System.currentTimeMillis() - startTimeMs) + " ms");
+          }
+          if (printExceptionTrace) {
+            LOG.warn(e);
+            printExceptionTrace = false;
+          }
+        }
+      }
+      return gets;
     }
 
     protected Get createGet(long keyToRead) throws IOException {
@@ -278,28 +320,53 @@ public class MultiThreadedReader extends
       return get;
     }
 
-    public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
-      String rowKey = Bytes.toString(get.getRow());
+    public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
+      // read the data
+      long start = System.nanoTime();
+      // Uses multi/batch gets
+      Result[] results = table.get(Arrays.asList(gets));
+      long end = System.nanoTime();
+      verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
+    }
 
+    public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
       // read the data
+      
       long start = System.nanoTime();
+      // Uses simple get
       Result result = table.get(get);
       long end = System.nanoTime();
-      verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, table, false);
+      verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
     }
 
-    protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano,
-        Result result, HTable table, boolean isNullExpected)
+    protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
+        Result[] results, HTable table, boolean isNullExpected)
         throws IOException {
       totalOpTimeMs.addAndGet(elapsedNano / 1000000);
-      numKeys.addAndGet(1);
+      numKeys.addAndGet(gets.length);
+      int i = 0;
+      for (Result result : results) {
+        verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
+            isNullExpected);
+      }
+    }
+
+    protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
+        Result result, HTable table, boolean isNullExpected)
+        throws IOException {
+      verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
+          new Result[]{result}, table, isNullExpected);
+    }
+
+    private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
+        Result result, HTable table, boolean isNullExpected) throws IOException {
       if (!result.isEmpty()) {
         if (verify) {
           numKeysVerified.incrementAndGet();
         }
       } else {
-         HRegionLocation hloc = table.getRegionLocation(
-             Bytes.toBytes(rowKey));
+         HRegionLocation hloc = table.getRegionLocation(get.getRow());
+         String rowKey = Bytes.toString(get.getRow());
         LOG.info("Key = " + rowKey + ", RegionServer: "
             + hloc.getHostname());
         if(isNullExpected) {

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java?rev=1577372&r1=1577371&r2=1577372&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java Thu Mar 13 23:22:33 2014
@@ -110,7 +110,7 @@ public class MultiThreadedReaderWithACL 
             boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0);
             LOG.info("Read happening from ACL " + isNullExpected);
             long end = System.nanoTime();
-            verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, localTable, isNullExpected);
+            verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable, isNullExpected);
           } catch (IOException e) {
             recordFailure(keyToRead);
           }