You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/07/19 08:57:20 UTC

git commit: HBASE-11548 [PE] Add 'cycling' test N times and unit tests for size/zipf/valueSize calculations

Repository: hbase
Updated Branches:
  refs/heads/master dcec55148 -> 4aeded758


HBASE-11548 [PE] Add 'cycling' test N times and unit tests for size/zipf/valueSize calculations


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4aeded75
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4aeded75
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4aeded75

Branch: refs/heads/master
Commit: 4aeded75883a4f7d1c888b921e69f8bd421a48bb
Parents: dcec551
Author: stack <st...@apache.org>
Authored: Fri Jul 18 23:57:00 2014 -0700
Committer: stack <st...@apache.org>
Committed: Fri Jul 18 23:57:00 2014 -0700

----------------------------------------------------------------------
 .../client/ScannerCallableWithReplicas.java     |   4 +-
 .../hadoop/hbase/PerformanceEvaluation.java     | 122 +++++++++++++------
 .../hadoop/hbase/TestPerformanceEvaluation.java |  97 ++++++++++++++-
 3 files changed, 180 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4aeded75/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 4c99e01..f5ff01d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -210,8 +210,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
       currentScannerCallable = scanner;
       // store where to start the replica scanner from if we need to.
       if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting current scanner as " + currentScannerCallable.scannerId +
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Setting current scanner as " + currentScannerCallable.scannerId +
             " associated with " + currentScannerCallable.getHRegionInfo().getReplicaId());
       }
       // close all outstanding replica scanners but the one we heard back from

http://git-wip-us.apache.org/repos/asf/hbase/blob/4aeded75/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index 0963ad3..1634b55 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -41,9 +41,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -90,16 +87,17 @@ import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.codehaus.jackson.map.ObjectMapper;
-
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.stats.UniformSample;
-import com.yammer.metrics.stats.Snapshot;
-
 import org.htrace.Sampler;
 import org.htrace.Trace;
 import org.htrace.TraceScope;
 import org.htrace.impl.ProbabilitySampler;
 
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.stats.Snapshot;
+import com.yammer.metrics.stats.UniformSample;
+
 /**
  * Script used evaluating HBase performance and scalability.  Runs a HBase
  * client that steps through one of a set of hardcoded tests or 'experiments'
@@ -472,34 +470,47 @@ public class PerformanceEvaluation extends Configured implements Tool {
     return job;
   }
 
+  /**
+   * Per client, how many tasks will we run?  We divide number of rows by this number and have the
+   * client do the resulting count in a map task.
+   */
+  static int TASKS_PER_CLIENT = 10;
+
+  static String JOB_INPUT_FILENAME = "input.txt";
+
   /*
    * Write input file of offsets-per-client for the mapreduce job.
    * @param c Configuration
-   * @return Directory that contains file written.
+   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
    * @throws IOException
    */
-  private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
+  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
+    return writeInputFile(c, opts, new Path("."));
+  }
+
+  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
+  throws IOException {
     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
-    Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
+    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
     Path inputDir = new Path(jobdir, "inputs");
 
     FileSystem fs = FileSystem.get(c);
     fs.mkdirs(inputDir);
 
-    Path inputFile = new Path(inputDir, "input.txt");
+    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
     PrintStream out = new PrintStream(fs.create(inputFile));
     // Make input random.
     Map<Integer, String> m = new TreeMap<Integer, String>();
     Hash h = MurmurHash.getInstance();
     int perClientRows = (opts.totalRows / opts.numClientThreads);
     try {
-      for (int i = 0; i < 10; i++) {
+      for (int i = 0; i < TASKS_PER_CLIENT; i++) {
         for (int j = 0; j < opts.numClientThreads; j++) {
           TestOptions next = new TestOptions(opts);
           next.startRow = (j * perClientRows) + (i * (perClientRows/10));
           next.perClientRunRows = perClientRows / 10;
           String s = MAPPER.writeValueAsString(next);
-          LOG.info("maptask input=" + s);
+          LOG.info("Client=" + j + ", maptask=" + i + ", input=" + s);
           int hash = h.hash(Bytes.toBytes(s));
           m.put(hash, s);
         }
@@ -579,6 +590,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     boolean valueZipf = false;
     int valueSize = DEFAULT_VALUE_LENGTH;
     int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
+    int cycles = 1;
 
     public TestOptions() {}
 
@@ -588,6 +600,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
      */
     public TestOptions(TestOptions that) {
       this.cmdName = that.cmdName;
+      this.cycles = that.cycles;
       this.nomapred = that.nomapred;
       this.startRow = that.startRow;
       this.size = that.size;
@@ -620,6 +633,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
       this.randomSleep = that.randomSleep;
     }
 
+    public int getCycles() {
+      return this.cycles;
+    }
+
+    public void setCycles(final int cycles) {
+      this.cycles = cycles;
+    }
+ 
     public boolean isValueZipf() {
       return valueZipf;
     }
@@ -904,11 +925,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
      */
     Test(final HConnection con, final TestOptions options, final Status status) {
       this.connection = con;
-      this.conf = con.getConfiguration();
+      this.conf = con ==  null? null: this.connection.getConfiguration();
+      this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
       this.opts = options;
       this.status = status;
       this.testName = this.getClass().getSimpleName();
-      receiverHost = SpanReceiverHost.getInstance(conf);
       if (options.traceRate >= 1.0) {
         this.traceSampler = Sampler.ALWAYS;
       } else if (options.traceRate > 0.0) {
@@ -918,7 +939,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
       }
       everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
       if (options.isValueZipf()) {
-        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.1);
+        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
       }
       LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
     }
@@ -1016,18 +1037,21 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testTimed() throws IOException, InterruptedException {
       int lastRow = opts.startRow + opts.perClientRunRows;
       // Report on completion of 1/10th of total.
-      for (int i = opts.startRow; i < lastRow; i++) {
-        if (i % everyN != 0) continue;
-        long startTime = System.nanoTime();
-        TraceScope scope = Trace.startSpan("test row", traceSampler);
-        try {
-          testRow(i);
-        } finally {
-          scope.close();
-        }
-        latency.update((System.nanoTime() - startTime) / 1000);
-        if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
-          status.setStatus(generateStatus(opts.startRow, i, lastRow));
+      for (int ii = 0; ii < opts.cycles; ii++) {
+        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
+        for (int i = opts.startRow; i < lastRow; i++) {
+          if (i % everyN != 0) continue;
+          long startTime = System.nanoTime();
+          TraceScope scope = Trace.startSpan("test row", traceSampler);
+          try {
+            testRow(i);
+          } finally {
+            scope.close();
+          }
+          latency.update((System.nanoTime() - startTime) / 1000);
+          if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
+            status.setStatus(generateStatus(opts.startRow, i, lastRow));
+          }
         }
       }
     }
@@ -1598,6 +1622,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
       "by randomRead. Default: disabled");
     System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
+    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
     System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
     System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
     System.err.println();
@@ -1650,6 +1675,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
         continue;
       }
 
+      final String cycles = "--cycles=";
+      if (cmd.startsWith(cycles)) {
+        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
+        continue;
+      }
+
       final String sampleRate = "--sampleRate=";
       if (cmd.startsWith(sampleRate)) {
         opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
@@ -1761,6 +1792,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
       final String size = "--size=";
       if (cmd.startsWith(size)) {
         opts.size = Float.parseFloat(cmd.substring(size.length()));
+        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
         continue;
       }
 
@@ -1815,26 +1847,36 @@ public class PerformanceEvaluation extends Configured implements Tool {
       if (isCommandClass(cmd)) {
         opts.cmdName = cmd;
         opts.numClientThreads = Integer.parseInt(args.remove());
-        int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
         if (opts.size != DEFAULT_OPTS.size &&
             opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
-          throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
-        }
-        if (opts.size != DEFAULT_OPTS.size) {
-          // total size in GB specified
-          opts.totalRows = (int) opts.size * rowsPerGB;
-          opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
-        } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
-          // number of rows specified
-          opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
-          opts.size = opts.totalRows / rowsPerGB;
+          throw new IllegalArgumentException(rows + " and " + size +
+            " are mutually exclusive options");
         }
+        opts = calculateRowsAndSize(opts);
         break;
       }
     }
     return opts;
   }
 
+  static TestOptions calculateRowsAndSize(final TestOptions opts) {
+    int rowsPerGB = getRowsPerGB(opts);
+    if (opts.size != DEFAULT_OPTS.size) {
+      // total size in GB specified
+      opts.totalRows = (int) opts.size * rowsPerGB;
+      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
+    } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
+      // number of rows specified
+      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
+      opts.size = opts.totalRows / rowsPerGB;
+    }
+    return opts;
+  }
+
+  static int getRowsPerGB(final TestOptions opts) {
+    return ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
+  }
+
   @Override
   public int run(String[] args) throws Exception {
     // Process command-line args. TODO: Better cmd-line processing

http://git-wip-us.apache.org/repos/asf/hbase/blob/4aeded75/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
index a5bfcbe..6bce51d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
@@ -17,18 +17,35 @@
  */
 package org.apache.hadoop.hbase;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.PerformanceEvaluation.RandomReadTest;
+import org.apache.hadoop.hbase.PerformanceEvaluation.TestOptions;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.stats.Snapshot;
+import com.yammer.metrics.stats.UniformSample;
+
 @Category(SmallTests.class)
 public class TestPerformanceEvaluation {
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+
   @Test
   public void testSerialization()
   throws JsonGenerationException, JsonMappingException, IOException {
@@ -41,4 +58,82 @@ public class TestPerformanceEvaluation {
         mapper.readValue(optionsString, PerformanceEvaluation.TestOptions.class);
     assertTrue(optionsDeserialized.isAutoFlush());
   }
+
+  /**
+   * Exercise the mr spec writing.  Simple assertions to make sure it is basically working.
+   * @throws IOException
+   */
+  @Ignore @Test
+  public void testWriteInputFile() throws IOException {
+    TestOptions opts = new PerformanceEvaluation.TestOptions();
+    final int clients = 10;
+    opts.setNumClientThreads(clients);
+    opts.setPerClientRunRows(10);
+    Path dir =
+      PerformanceEvaluation.writeInputFile(HTU.getConfiguration(), opts, HTU.getDataTestDir());
+    FileSystem fs = FileSystem.get(HTU.getConfiguration());
+    Path p = new Path(dir, PerformanceEvaluation.JOB_INPUT_FILENAME);
+    long len = fs.getFileStatus(p).getLen();
+    assertTrue(len > 0);
+    byte [] content = new byte[(int)len];
+    FSDataInputStream dis = fs.open(p);
+    try {
+      dis.readFully(content);
+      BufferedReader br =
+        new BufferedReader(new InputStreamReader(new ByteArrayInputStream(content)));
+      int count = 0;
+      while (br.readLine() != null) {
+        count++;
+      }
+      assertEquals(clients * PerformanceEvaluation.TASKS_PER_CLIENT, count);
+    } finally {
+      dis.close();
+    }
+  }
+
+  @Test
+  public void testSizeCalculation() {
+    TestOptions opts = new PerformanceEvaluation.TestOptions();
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    int rows = opts.getPerClientRunRows();
+    // Default row count
+    final int defaultPerClientRunRows = 1024 * 1024;
+    assertEquals(defaultPerClientRunRows, rows);
+    // If size is 2G, then twice the row count.
+    opts.setSize(2.0f);
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows());
+    // If two clients, then they get half the rows each.
+    opts.setNumClientThreads(2);
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    assertEquals(defaultPerClientRunRows, opts.getPerClientRunRows());
+    // What if valueSize is 'random'? Then half of the valueSize so twice the rows.
+    opts.valueRandom = true;
+    opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+    assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows());
+  }
+
+  @Test
+  public void testZipfian()
+  throws NoSuchMethodException, SecurityException, InstantiationException, IllegalAccessException,
+      IllegalArgumentException, InvocationTargetException {
+    TestOptions opts = new PerformanceEvaluation.TestOptions();
+    opts.setValueZipf(true);
+    final int valueSize = 1024;
+    opts.setValueSize(valueSize);
+    RandomReadTest rrt = new RandomReadTest(null, opts, null);
+    Constructor<?> ctor =
+      Histogram.class.getDeclaredConstructor(com.yammer.metrics.stats.Sample.class);
+    ctor.setAccessible(true);
+    Histogram histogram = (Histogram)ctor.newInstance(new UniformSample(1024 * 500));
+    for (int i = 0; i < 100; i++) {
+      histogram.update(rrt.getValueLength(null));
+    }
+    double stddev = histogram.stdDev();
+    assertTrue(stddev != 0 && stddev != 1.0);
+    assertTrue(histogram.stdDev() != 0);
+    Snapshot snapshot = histogram.getSnapshot();
+    double median = snapshot.getMedian();
+    assertTrue(median != 0 && median != 1 && median != valueSize);
+  }
 }
\ No newline at end of file