You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:45:18 UTC

svn commit: r1181977 - in /hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils: MultiThreadedAction.java MultiThreadedActionMBean.java MultiThreadedReader.java MultiThreadedWriter.java

Author: nspiegelberg
Date: Tue Oct 11 17:45:17 2011
New Revision: 1181977

URL: http://svn.apache.org/viewvc?rev=1181977&view=rev
Log:
Added HBase load tester stats output for OpenTSDB

Summary:
Load tester reader and writer threads now make stats available
over JMX. Stats include keys per second, columns per second, average
latency and errors counts. The script which runs the load tester as a
map-reduce job sets the necessary parameters to enable JMX access. Added
a script to collect collect stats from JMX and output JSON to stdout for
exporting to OpenTSDB.

Test Plan: ran the load test on a dev cluster and observed stats in OpenTSDB

Reviewers: kranganathan

Reviewed By: kranganathan

CC: hbase@lists, kranganathan, python-diffs@lists

Differential Revision: 325412

Task ID: 697920

Added:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedActionMBean.java
Modified:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java?rev=1181977&r1=1181976&r2=1181977&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java Tue Oct 11 17:45:17 2011
@@ -19,16 +19,19 @@
  */
 package org.apache.hadoop.hbase.manual.utils;
 
+import java.lang.management.ManagementFactory;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 
-
-public abstract class MultiThreadedAction
+public abstract class MultiThreadedAction implements MultiThreadedActionMBean
 {
   private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
   public static int numThreads_ = 1;
@@ -48,6 +51,95 @@ public abstract class MultiThreadedActio
   public Random random_ = new Random();
   public HBaseConfiguration conf_;
 
+  private AtomicLong priorKeysPerSecondCumulativeKeys_ = new AtomicLong(0);
+  private AtomicLong priorKeysPerSecondTime_ = new AtomicLong(System.currentTimeMillis());
+  private AtomicLong priorColumnsPerSecondCumulativeColumns_ = new AtomicLong(0);
+  private AtomicLong priorColumnsPerSecondTime_ = new AtomicLong(System.currentTimeMillis());
+  private AtomicLong priorLatencyCumulativeKeys_ = new AtomicLong(0);
+  private AtomicLong priorLatencyCumulativeLatency_ = new AtomicLong(0);
+  private final long startTime = System.currentTimeMillis();
+
+  public MultiThreadedAction(String id) {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    try {
+      ObjectName name = new ObjectName("LoadTester:name=" + id);
+      mbs.registerMBean(this, name);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public long getKeysPerSecond() {
+    long currentTime = System.currentTimeMillis();
+    long priorTime = priorKeysPerSecondTime_.getAndSet(currentTime);
+    long currentKeys = numKeys_.get();
+    long priorKeys = priorKeysPerSecondCumulativeKeys_.getAndSet(currentKeys);
+    long timeDelta = currentTime - priorTime;
+    if (timeDelta == 0) {
+      return 0;
+    }
+    return 1000 * (currentKeys - priorKeys) / timeDelta;
+  }
+
+  public long getColumnsPerSecond() {
+    long currentTime = System.currentTimeMillis();
+    long priorTime = priorColumnsPerSecondTime_.getAndSet(currentTime);
+    long currentColumns = numCols_.get();
+    long priorColumns = priorColumnsPerSecondCumulativeColumns_.getAndSet(currentColumns);
+    long timeDelta = currentTime - priorTime;
+    if (timeDelta == 0) {
+      return 0;
+    }
+    return 1000 * (currentColumns - priorColumns) / timeDelta;
+  }
+
+  public long getAverageLatency() {
+    long currentLatency = cumulativeOpTime_.get();
+    long priorLatency = priorLatencyCumulativeLatency_.getAndSet(currentLatency);
+    long currentKeys = numKeys_.get();
+    long priorKeys = priorLatencyCumulativeKeys_.getAndSet(currentKeys);
+    long keyDelta = currentKeys - priorKeys;
+    if (keyDelta == 0) {
+      return 0;
+    }
+    return (currentLatency - priorLatency) / keyDelta;
+  }
+
+  public long getCumulativeKeysPerSecond() {
+    long timeDelta = System.currentTimeMillis() - startTime;
+    if (timeDelta == 0) {
+      return 0;
+    }
+    return 1000 * numKeys_.get() / timeDelta;
+  }
+
+  public long getCumulativeKeys() {
+    return numKeys_.get();
+  }
+
+  public long getCumulativeColumns() {
+    return numCols_.get();
+  }
+
+  public long getCumulativeAverageLatency() {
+    if (numKeys_.get() == 0) {
+      return 0;
+    }
+    return cumulativeOpTime_.get() / numKeys_.get();
+  }
+
+  public long getCumulativeErrors() {
+    return numErrors_.get();
+  }
+
+  public long getCumulativeOpFailures() {
+    return numOpFailures_.get();
+  }
+
+  public long getCumulativeKeysVerified() {
+    return numKeysVerified_.get();
+  }
+
   public void startReporter(String id) {
     (new ProgressReporter(id)).start();
   }
@@ -66,6 +158,7 @@ public abstract class MultiThreadedActio
 
       long priorNumKeys = 0;
       long priorCumulativeOpTime = 0;
+      int priorAverageKeysPerSecond = 0;
 
       while(numThreadsWorking_.get() != 0) {
         String threadsLeft = "[" + id_ + ":" + numThreadsWorking_.get() + "] ";
@@ -79,6 +172,7 @@ public abstract class MultiThreadedActio
 
           long numKeysDelta = numKeys - priorNumKeys;
           long cumulativeOpTimeDelta = cumulativeOpTime - priorCumulativeOpTime;
+          double averageKeysPerSecond = (time > 0) ? (numKeys * 1000 / time) : 0;
 
           LOG.info(threadsLeft + "Keys = " + numKeys +
                    ", cols = " + DisplayFormatUtils.formatNumber(numCols_.get()) +
@@ -96,8 +190,15 @@ public abstract class MultiThreadedActio
                    ((numErrors_.get()>0)?(", ERRORS = " + numErrors_.get()):"")
                    );
 
+          // Write stats in a format that can be interpretted as counters by
+          // streaming map-reduce jobs.
+          System.err.println("reporter:counter:numKeys," + id_ + "," + numKeysDelta);
+          System.err.println("reporter:counter:numCols," + id_ + "," + numCols_.get());
+          System.err.println("reporter:counter:avgKeysPerSecond," + id_ + "," + ((int)averageKeysPerSecond - priorAverageKeysPerSecond));
+
           priorNumKeys = numKeys;
           priorCumulativeOpTime = cumulativeOpTime;
+          priorAverageKeysPerSecond = (int)averageKeysPerSecond;
         }
         try {
           Thread.sleep(reportingInterval);

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedActionMBean.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedActionMBean.java?rev=1181977&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedActionMBean.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedActionMBean.java Tue Oct 11 17:45:17 2011
@@ -0,0 +1,80 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual.utils;
+
+public interface MultiThreadedActionMBean {
+
+  /**
+   * @return the average number of keys processed per second since the previous
+   *         invocation of this method
+   */
+  public long getKeysPerSecond();
+
+  /**
+   * @return the average number of columns processed per second since the
+   *         previous invocation of this method
+   */
+  public long getColumnsPerSecond();
+
+  /**
+   * @return the average latency of operations since the previous invocation of
+   *         this method
+   */
+  public long getAverageLatency();
+
+  /**
+   * @return the average number of keys processed per second since the creation
+   *         of this action
+   */
+  public long getCumulativeKeysPerSecond();
+
+  /**
+   * @return the total number of keys processed since the creation of this
+   *         action
+   */
+  public long getCumulativeKeys();
+
+  /**
+   * @return the total number of columns processed since the creation of this
+   *         action
+   */
+  public long getCumulativeColumns();
+
+  /**
+   * @return the average latency of operations since the creation of this action
+   */
+  public long getCumulativeAverageLatency();
+
+  /**
+   * @return the total number of errors since the creation of this action
+   */
+  public long getCumulativeErrors();
+
+  /**
+   * @return the total number of operation failures since the creation of this
+   *         action
+   */
+  public long getCumulativeOpFailures();
+
+  /**
+   * @return the total number of keys verified since the creation of this action
+   */
+  public long getCumulativeKeysVerified();
+}

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java?rev=1181977&r1=1181976&r2=1181977&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java Tue Oct 11 17:45:17 2011
@@ -43,6 +43,7 @@ public class MultiThreadedReader extends
   Set<HBaseReader> readers_ = new HashSet<HBaseReader>();
 
   public MultiThreadedReader(HBaseConfiguration conf, byte[] tableName, byte[] columnFamily) {
+    super("R");
     tableName_ = tableName;
     columnFamily_ = columnFamily;
     conf_ = conf;

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java?rev=1181977&r1=1181976&r2=1181977&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java Tue Oct 11 17:45:17 2011
@@ -56,6 +56,7 @@ public class MultiThreadedWriter extends
   public static List<Long> failedKeySet_ = Collections.synchronizedList(new ArrayList<Long>());
 
   public MultiThreadedWriter(HBaseConfiguration conf, byte[] tableName, byte[] columnFamily) {
+    super("W");
     tableName_ = tableName;
     columnFamily_ = columnFamily;
     conf_ = conf;