You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/03/09 14:48:21 UTC

svn commit: r1298825 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/ bookkeeper-benchmark/bin/ bookkeeper-benchmark/conf/ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-benchmark/src/test/ bookkeeper-benchmark...

Author: fpj
Date: Fri Mar  9 13:48:20 2012
New Revision: 1298825

URL: http://svn.apache.org/viewvc?rev=1298825&view=rev
Log:
BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)


Added:
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml
    zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1298825&r1=1298824&r2=1298825&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Mar  9 13:48:20 2012
@@ -84,6 +84,9 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
 
+      bookkeeper-benchmark/
+	BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)
+
 Release 4.0.0 - 2011-11-30
 
   Non-backward compatible changes:

Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark Fri Mar  9 13:48:20 2012
@@ -0,0 +1,131 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# check if net.ipv6.bindv6only is set to 1
+bindv6only=$(/sbin/sysctl -n net.ipv6.bindv6only 2> /dev/null)
+if [ -n "$bindv6only" ] && [ "$bindv6only" -eq "1" ]
+then
+  echo "Error: \"net.ipv6.bindv6only\" is set to 1 - Java networking could be broken"
+  echo "For more info (the following page also applies to bookkeeper): http://wiki.apache.org/hadoop/HadoopIPv6"
+  exit 1
+fi
+
+BINDIR=`dirname "$0"`
+BENCH_HOME=`cd $BINDIR/..;pwd`
+
+RELEASE_JAR=`ls $BENCH_HOME/bookkeeper-benchmark-*.jar 2> /dev/null | tail -1` 
+if [ $? == 0 ]; then
+    BENCHMARK_JAR=$RELEASE_JAR
+fi
+
+BUILT_JAR=`ls $BENCH_HOME/target/bookkeeper-benchmark-*.jar 2> /dev/null | tail -1`
+if [ $? != 0 ] && [ ! -e "$BENCHMARK_JAR" ]; then 
+    echo "\nCouldn't find benchmark jar.";
+    echo "Make sure you've run 'mvn package'\n";
+    exit 1;
+elif [ -e "$BUILT_JAR" ]; then
+    BENCHMARK_JAR=$BUILT_JAR
+fi
+
+benchmark_help() {
+    cat <<EOF
+Usage: $0 <command>
+where command is one of:
+    writes              Benchmark throughput and latency for writes
+    reads               Benchmark throughput and latency for reads
+    bookie              Benchmark an individual bookie
+    help                This help message
+
+use -help with individual commands for more options. For example,
+   $0 writes -help
+
+or command is the full name of a class with a defined main() method.
+
+Environment variables:
+   BENCHMARK_LOG_CONF        Log4j configuration file (default: conf/log4j.properties)
+   BENCHMARK_EXTRA_OPTS      Extra options to be passed to the jvm
+   BENCHMARK_EXTRA_CLASSPATH Add extra paths to the bookkeeper classpath
+
+EOF
+}
+
+add_maven_deps_to_classpath() {
+    MVN="mvn"
+    if [ "$MAVEN_HOME" != "" ]; then
+	MVN=${MAVEN_HOME}/bin/mvn
+    fi
+    
+    # Need to generate classpath from maven pom. This is costly so generate it
+    # and cache it. Save the file into our target dir so a mvn clean will get
+    # clean it up and force us create a new one.
+    f="${BENCH_HOME}/target/cached_classpath.txt"
+    if [ ! -f "${f}" ]
+    then
+	${MVN} -f "${BENCH_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null
+    fi
+    BENCHMARK_CLASSPATH=${CLASSPATH}:`cat "${f}"`
+}
+
+if [ -d "$BENCH_HOME/lib" ]; then
+    for i in $BENCH_HOME/lib/*.jar; do
+	BENCHMARK_CLASSPATH=$BENCHMARK_CLASSPATH:$i
+    done
+else
+    add_maven_deps_to_classpath
+fi
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+    benchmark_help;
+    exit 1;
+fi
+
+# get arguments
+COMMAND=$1
+shift
+
+BENCHMARK_CLASSPATH="$BENCHMARK_JAR:$BENCHMARK_CLASSPATH:$BENCHMARK_EXTRA_CLASSPATH"
+BENCHMARK_LOG_CONF=${BENCHMARK_LOG_CONF:-$BENCH_HOME/conf/log4j.properties}
+
+if [ "$BENCHMARK_LOG_CONF" != "" ]; then
+    BENCHMARK_CLASSPATH="`dirname $BENCHMARK_LOG_CONF`:$BENCHMARK_CLASSPATH"
+    OPTS="$OPTS -Dlog4j.configuration=`basename $BENCHMARK_LOG_CONF`"
+fi
+OPTS="-cp $BENCHMARK_CLASSPATH $OPTS $BENCHMARK_EXTRA_OPTS"
+
+OPTS="$OPTS $BENCHMARK_EXTRA_OPTS"
+
+# Disable ipv6 as it can cause issues
+OPTS="$OPTS -Djava.net.preferIPv4Stack=true"
+
+if [ $COMMAND == "writes" ]; then
+    exec java $OPTS org.apache.bookkeeper.benchmark.BenchThroughputLatency $@
+elif [ $COMMAND == "reads" ]; then
+    exec java $OPTS org.apache.bookkeeper.benchmark.BenchReadThroughputLatency $@
+elif [ $COMMAND == "bookie" ]; then
+    exec java $OPTS org.apache.bookkeeper.benchmark.BenchBookie $@
+elif [ $COMMAND == "help" ]; then
+    benchmark_help;
+else
+    exec java $OPTS $COMMAND $@
+fi
+

Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties Fri Mar  9 13:48:20 2012
@@ -0,0 +1,73 @@
+#
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# 
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only
+log4j.rootLogger=ERROR, CONSOLE
+
+# Example with rolling log file
+#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
+
+# Example with rolling log file and tracing
+#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+log4j.logger.org.apache.bookkeeper.benchmark=INFO
+
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=bookkeeper-benchmark.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+
+
+#
+# Add TRACEFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
+log4j.appender.TRACEFILE.Threshold=TRACE
+log4j.appender.TRACEFILE.File=bookkeeper_trace.log
+
+log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
+### Notice we are including log4j's NDC here (%x)
+log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n

Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml?rev=1298825&r1=1298824&r2=1298825&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml Fri Mar  9 13:48:20 2012
@@ -106,5 +106,10 @@
 	</exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <version>1.2</version>
+    </dependency>
   </dependencies>
 </project>

Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java Fri Mar  9 13:48:20 2012
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.benchmark;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BenchBookie {
+    static Logger LOG = LoggerFactory.getLogger(BenchBookie.class);
+
+    static class LatencyCallback implements WriteCallback {
+        boolean complete;
+        @Override
+        synchronized public void writeComplete(int rc, long ledgerId, long entryId,
+                InetSocketAddress addr, Object ctx) {
+            if (rc != 0) {
+                LOG.error("Got error " + rc);
+            }
+            complete = true;
+            notifyAll();
+        }
+        synchronized public void resetComplete() {
+            complete = false;
+        }
+        synchronized public void waitForComplete() throws InterruptedException {
+            while(!complete) {
+                wait();
+            }
+        }
+    }
+
+    static class ThroughputCallback implements WriteCallback {
+        int count;
+        int waitingCount = Integer.MAX_VALUE;
+        synchronized public void writeComplete(int rc, long ledgerId, long entryId,
+                InetSocketAddress addr, Object ctx) {
+            if (rc != 0) {
+                LOG.error("Got error " + rc);
+            }
+            count++;
+            if (count >= waitingCount) {
+                notifyAll();
+            }
+        }
+        synchronized public void waitFor(int count) throws InterruptedException {
+            while(this.count < count) {
+                waitingCount = count;
+                wait(1000);
+            }
+            waitingCount = Integer.MAX_VALUE;
+        }
+    }
+
+    /**
+     * @param args
+     * @throws InterruptedException
+     */
+    public static void main(String[] args) throws InterruptedException, ParseException {
+        Options options = new Options();
+        options.addOption("host", true, "Hostname or IP of bookie to benchmark");
+        options.addOption("port", true, "Port of bookie to benchmark (default 3181)");
+        options.addOption("ledger", true, "Ledger Id to write to (default 1)");
+        options.addOption("help", false, "This message");
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmd = parser.parse(options, args);
+
+        if (cmd.hasOption("help") || !cmd.hasOption("host")) {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("BenchBookie <options>", options);
+            System.exit(-1);
+        }
+
+        String addr = cmd.getOptionValue("host");
+        int port = Integer.valueOf(cmd.getOptionValue("port", "3181"));
+        int ledger = Integer.valueOf(cmd.getOptionValue("ledger", "1"));
+
+        ClientSocketChannelFactory channelFactory
+            = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+                                                .newCachedThreadPool());
+        OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+
+        ClientConfiguration conf = new ClientConfiguration();
+        BookieClient bc = new BookieClient(conf, channelFactory, executor);
+        LatencyCallback lc = new LatencyCallback();
+
+        ThroughputCallback tc = new ThroughputCallback();
+        int warmUpCount = 999;
+        for(long entry = 0; entry < warmUpCount; entry++) {
+            ChannelBuffer toSend = ChannelBuffers.buffer(128);
+            toSend.resetReaderIndex();
+            toSend.resetWriterIndex();
+            toSend.writeLong(ledger);
+            toSend.writeLong(entry);
+            toSend.writerIndex(toSend.capacity());
+            bc.addEntry(new InetSocketAddress(addr, port), ledger, new byte[20], 
+                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+        }
+        LOG.info("Waiting for warmup");
+        tc.waitFor(warmUpCount);
+
+        LOG.info("Benchmarking latency");
+        int entryCount = 5000;
+        long startTime = System.nanoTime();
+        for(long entry = 0; entry < entryCount; entry++) {
+            ChannelBuffer toSend = ChannelBuffers.buffer(128);
+            toSend.resetReaderIndex();
+            toSend.resetWriterIndex();
+            toSend.writeLong(ledger+1);
+            toSend.writeLong(entry);
+            toSend.writerIndex(toSend.capacity());
+            lc.resetComplete();
+            bc.addEntry(new InetSocketAddress(addr, port), ledger+1, new byte[20], 
+                        entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
+            lc.waitForComplete();
+        }
+        long endTime = System.nanoTime();
+        LOG.info("Latency: " + (((double)(endTime-startTime))/((double)entryCount))/1000000.0);
+
+        entryCount = 50000;
+        LOG.info("Benchmarking throughput");
+        startTime = System.currentTimeMillis();
+        tc = new ThroughputCallback();
+        for(long entry = 0; entry < entryCount; entry++) {
+            ChannelBuffer toSend = ChannelBuffers.buffer(128);
+            toSend.resetReaderIndex();
+            toSend.resetWriterIndex();
+            toSend.writeLong(ledger+2);
+            toSend.writeLong(entry);
+            toSend.writerIndex(toSend.capacity());
+            bc.addEntry(new InetSocketAddress(addr, port), ledger+2, new byte[20], 
+                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+        }
+        tc.waitFor(entryCount);
+        endTime = System.currentTimeMillis();
+        LOG.info("Throughput: " + ((long)entryCount)*1000/(endTime-startTime));
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java Fri Mar  9 13:48:20 2012
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.benchmark;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event;
+
+import java.util.Enumeration;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BenchReadThroughputLatency {
+    static Logger LOG = LoggerFactory.getLogger(BenchReadThroughputLatency.class);
+
+    private static final Pattern LEDGER_PATTERN = Pattern.compile("L([0-9]+)$");
+
+    private static final Comparator<String> ZK_LEDGER_COMPARE = new Comparator<String>() {
+        public int compare(String o1, String o2) {
+            try {
+                Matcher m1 = LEDGER_PATTERN.matcher(o1);
+                Matcher m2 = LEDGER_PATTERN.matcher(o2);
+                if (m1.find() && m2.find()) {
+                    return Integer.valueOf(m1.group(1))
+                        - Integer.valueOf(m2.group(1));
+                } else {
+                    return o1.compareTo(o2);
+                }
+            } catch (Throwable t) {
+                return o1.compareTo(o2);
+            }
+        }
+    };
+
+    private static void readLedger(String zkservers, long ledgerId, byte[] passwd) {
+        LOG.info("Reading ledger {}", ledgerId);
+        BookKeeper bk = null;
+        long time = 0;
+        long entriesRead = 0;
+        long lastRead = 0;
+        int nochange = 0;
+
+        LedgerHandle lh = null;
+        try {
+            bk = new BookKeeper(zkservers);
+            while (true) {
+                lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, 
+                                             passwd);
+                long lastConfirmed = lh.getLastAddConfirmed();
+                if (lastConfirmed == lastRead) {
+                    nochange++;
+                    if (nochange == 10) {
+                        break;
+                    } else {
+                        Thread.sleep(1000);
+                        continue;
+                    }
+                } else {
+                    nochange = 0;
+                }
+                long starttime = System.nanoTime();
+
+                Enumeration<LedgerEntry> entries = lh.readEntries(lastRead+1, lastConfirmed);
+                lastRead = lastConfirmed;
+                while (entries.hasMoreElements()) {
+                    LedgerEntry e = entries.nextElement();
+                    entriesRead++;
+                    if ((entriesRead % 10000) == 0) {
+                        LOG.info("{} entries read", entriesRead);
+                    }
+                }
+                long endtime = System.nanoTime();
+                time += endtime - starttime;
+
+                lh.close();
+                lh = null;
+                Thread.sleep(1000);
+            }
+        } catch (InterruptedException ie) {
+            // ignore
+        } catch (Exception e ) {
+            LOG.error("Exception in reader", e);
+        } finally {
+            LOG.info("Read {} in {}ms", entriesRead, time/1000/1000);
+
+            try {
+                if (lh != null) {
+                    lh.close();
+                }
+                if (bk != null) {
+                    bk.close();
+                }
+            } catch (Exception e) {
+                LOG.error("Exception closing stuff", e);
+            }
+        }
+    }
+
+    private static void usage(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("BenchReadThroughputLatency <options>", options);
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. " 
+                          + " Cannot be used with -listen");
+        options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully");
+        options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')");
+        options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
+        options.addOption("help", false, "This message");
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmd = parser.parse(options, args);
+
+        if (cmd.hasOption("help")) {
+            usage(options);
+            System.exit(-1);
+        }
+
+        final String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
+        final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes();
+        if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
+            LOG.error("Cannot used -ledger and -listen together");
+            usage(options);
+            System.exit(-1);
+        }
+
+        final AtomicInteger ledger = new AtomicInteger(0);
+        final AtomicInteger numLedgers = new AtomicInteger(0);
+        if (cmd.hasOption("ledger")) {
+            ledger.set(Integer.valueOf(cmd.getOptionValue("ledger")));
+        } else if (cmd.hasOption("listen")) {
+            numLedgers.set(Integer.valueOf(cmd.getOptionValue("listen")));
+        } else {
+            LOG.error("You must use -ledger or -listen");
+            usage(options);
+            System.exit(-1);
+        }
+
+        final CountDownLatch shutdownLatch = new CountDownLatch(1);
+        final CountDownLatch connectedLatch = new CountDownLatch(1);
+        final String nodepath = String.format("/ledgers/L%010d", ledger.get());
+
+        final ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher() {
+                public void process(WatchedEvent event) {
+                    if (event.getState() == Event.KeeperState.SyncConnected
+                            && event.getType() == Event.EventType.None) {
+                        connectedLatch.countDown();
+                    }
+                }
+            });
+        try {
+            zk.register(new Watcher() {
+                    public void process(WatchedEvent event) {
+                        try {
+                            if (event.getState() == Event.KeeperState.SyncConnected 
+                                && event.getType() == Event.EventType.None) {
+                                connectedLatch.countDown();
+                            } else if (event.getType() == Event.EventType.NodeCreated
+                                       && event.getPath().equals(nodepath)) {
+                                readLedger(servers, ledger.get(), passwd);
+                                shutdownLatch.countDown();
+                            } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
+                                if (numLedgers.get() < 0) {
+                                    return;
+                                }
+                                List<String> children = zk.getChildren("/ledgers", true);
+                                List<String> ledgers = new ArrayList<String>();
+                                for (String child : children) {
+                                    if (LEDGER_PATTERN.matcher(child).find()) {
+                                        ledgers.add(child);
+                                    }
+                                }
+                                Collections.sort(ledgers, ZK_LEDGER_COMPARE);
+                                String last = ledgers.get(ledgers.size() - 1);
+                                final Matcher m = LEDGER_PATTERN.matcher(last);
+                                if (m.find()) {
+                                    int ledgersLeft = numLedgers.decrementAndGet();
+                                    Thread t = new Thread() {
+                                            public void run() {
+                                                readLedger(servers, Long.valueOf(m.group(1)), passwd);
+                                            }
+                                        };
+                                    t.start();
+                                    if (ledgersLeft <= 0) {
+                                        shutdownLatch.countDown();
+                                    }
+                                } else {
+                                    LOG.error("Cant file ledger id in {}", last);
+                                }
+                            } else {
+                                LOG.warn("Unknown event {}", event);
+                            }
+                        } catch (Exception e) {
+                            LOG.error("Exception in watcher", e);
+                        }
+                    }
+                });
+            connectedLatch.await();
+            if (ledger.get() != 0) {
+                if (zk.exists(nodepath, true) != null) {
+                    readLedger(servers, ledger.get(), passwd);
+                    shutdownLatch.countDown();
+                } else {
+                    LOG.info("Watching for creation of" + nodepath);
+                }
+            } else {
+                zk.getChildren("/ledgers", true);
+            }
+            shutdownLatch.await();
+            LOG.info("Shutting down");
+        } finally {
+            zk.close();
+        }
+    }
+}
\ No newline at end of file

Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java Fri Mar  9 13:48:20 2012
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.benchmark;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
+public class BenchThroughputLatency implements AddCallback, Runnable {
+    static Logger LOG = LoggerFactory.getLogger(BenchThroughputLatency.class);
+
+    BookKeeper bk;
+    LedgerHandle lh[];
+    AtomicLong counter;
+
+    Semaphore sem;
+    int pace;
+    int throttle;
+    int numberOfLedgers = 1;
+    final String servers;
+
+    class Context {
+        long localStartTime;
+        long globalStartTime;
+        long id;
+
+        Context(long id, long time){
+            this.id = id;
+            this.localStartTime = this.globalStartTime = time;
+        }
+    }
+
+    public BenchThroughputLatency(int ensemble, int qSize, byte[] passwd,
+                                  int throttle, int numberOfLedgers, String servers) 
+            throws KeeperException, IOException, InterruptedException {
+        this.sem = new Semaphore(throttle);
+        this.pace = pace;
+        this.throttle = throttle;
+
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setThrottleValue(100000);
+        conf.setZkServers(servers);
+        this.servers = servers;
+
+        bk = new BookKeeper(conf);
+        this.counter = new AtomicLong(0);
+        this.numberOfLedgers = numberOfLedgers;
+        try{
+            lh = new LedgerHandle[this.numberOfLedgers];
+
+            for(int i = 0; i < this.numberOfLedgers; i++) {
+                lh[i] = bk.createLedger(ensemble, qSize, BookKeeper.DigestType.CRC32, 
+                                        passwd);
+                LOG.info("Ledger Handle: " + lh[i].getId());
+            }
+        } catch (BKException e) {
+            e.printStackTrace();
+        }
+    }
+
+    Random rand = new Random();
+    public void close() throws InterruptedException, BKException {
+        for(int i = 0; i < numberOfLedgers; i++) {
+            lh[i].close();
+        }
+        bk.close();
+    }
+
+    long previous = 0;
+    byte bytes[];
+
+    void setEntryData(byte data[]) {
+        bytes = data;
+    }
+
+    int lastLedger = 0;
+    private int getRandomLedger() {
+         return rand.nextInt(numberOfLedgers);
+    }
+
+    int sendLimit = 2000000;
+    long latencies[] = new long[sendLimit];
+    int latencyIndex = -1;
+    AtomicLong completedRequests = new AtomicLong(0);
+
+    public void setSendLimit(int sendLimit) {
+        this.sendLimit = sendLimit;
+        latencies = new long[sendLimit];
+    }
+
+    long duration = -1;
+    synchronized public long getDuration() {
+        return duration;
+    }
+
+    public void run() {
+        LOG.info("Running...");
+        long start = previous = System.currentTimeMillis();
+
+        byte messageCount = 0;
+        int sent = 0;
+
+        Thread reporter = new Thread() {
+                public void run() {
+                    try {
+                        while(true) {
+                            Thread.sleep(200);
+                            LOG.info("ms: {} req: {}", System.currentTimeMillis(), completedRequests.getAndSet(0));
+                        }
+                    } catch (InterruptedException ie) {
+                        LOG.info("Caught interrupted exception, going away");
+                    }
+                }
+            };
+        reporter.start();
+        long beforeSend = System.nanoTime();
+
+        while(!Thread.currentThread().isInterrupted() && sent < sendLimit) {
+            try {
+                sem.acquire();
+                if (sent == 10000) {
+                    long afterSend = System.nanoTime();
+                    long time = afterSend - beforeSend;
+                    LOG.info("Time to send first batch: {}s {}ns ",
+                             time/1000/1000/1000, time);
+                }
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            final int index = getRandomLedger();
+            LedgerHandle h = lh[index];
+            if (h == null) {
+                LOG.error("Handle " + index + " is null!");
+            } else {
+                long nanoTime = System.nanoTime();
+                lh[index].asyncAddEntry(bytes, this, new Context(sent, nanoTime));
+                counter.incrementAndGet();
+            }
+            sent++;
+        }
+        LOG.info("Sent: "  + sent);
+        try {
+            synchronized (this) {
+                while(this.counter.get() > 0)
+                    Thread.sleep(1000);
+            }
+        } catch(InterruptedException e) {
+            LOG.error("Interrupted while waiting", e);
+        }
+        synchronized(this) {
+            duration = System.currentTimeMillis() - start;
+        }
+        throughput = sent*1000/duration;
+
+        reporter.interrupt();
+        try {
+            reporter.join();
+        } catch (InterruptedException ie) {
+            // ignore
+        }
+        LOG.info("Finished processing in ms: " + duration + " tp = " + throughput);
+    }
+
+    long throughput = -1;
+    public long getThroughput() {
+        return throughput;
+    }
+
+    long threshold = 20000;
+    long runningAverageCounter = 0;
+    long totalTime = 0;
+    @Override
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+        Context context = (Context) ctx;
+
+        // we need to use the id passed in the context in the case of
+        // multiple ledgers, and it works even with one ledger
+        entryId = context.id;
+        long newTime = System.nanoTime() - context.localStartTime;
+
+        sem.release();
+        counter.decrementAndGet();
+
+        latencies[(int)entryId] = newTime;
+
+        completedRequests.incrementAndGet();
+    }
+
+    public static void main(String[] args)
+            throws KeeperException, IOException, InterruptedException, ParseException, BKException {
+        Options options = new Options();
+        options.addOption("time", true, "Running time (seconds), default 60");
+        options.addOption("entrysize", true, "Entry size (bytes), default 1024");
+        options.addOption("ensemble", true, "Ensemble size, default 3");
+        options.addOption("quorum", true, "Quorum size, default 2");
+        options.addOption("throttle", true, "Max outstanding requests, default 10000");
+        options.addOption("ledgers", true, "Number of ledgers, default 1");
+        options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
+        options.addOption("password", true, "Password used to create ledgers (default 'benchPasswd')");
+        options.addOption("coord_node", true, "Coordination znode for multi client benchmarks (optional)");
+        options.addOption("help", false, "This message");
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmd = parser.parse(options, args);
+
+        if (cmd.hasOption("help")) {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("BenchThroughputLatency <options>", options);
+            System.exit(-1);
+        }
+
+        long runningTime = Long.valueOf(cmd.getOptionValue("time", "60"));
+        String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
+        int entrysize = Integer.valueOf(cmd.getOptionValue("entrysize", "1024"));
+
+        int ledgers = Integer.valueOf(cmd.getOptionValue("ledgers", "1"));
+        int ensemble = Integer.valueOf(cmd.getOptionValue("ensemble", "3"));
+        int quorum = Integer.valueOf(cmd.getOptionValue("quorum", "2"));
+        int throttle = Integer.valueOf(cmd.getOptionValue("throttle", "10000"));
+
+        String coordinationZnode = cmd.getOptionValue("coord_node");
+        final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes();
+
+        LOG.warn("(Parameters received) running time: " + runningTime +
+                ", entry size: " + entrysize + ", ensemble size: " + ensemble +
+                ", quorum size: " + quorum +
+                ", throttle: " + throttle +
+                ", number of ledgers: " + ledgers +
+                ", zk servers: " + servers);
+
+        long totalTime = runningTime*1000;
+
+        // Do a warmup run
+        Thread thread;
+
+        long lastWarmUpTP = -1;
+        long throughput;
+        LOG.info("Starting warmup");
+        byte data[] = new byte[entrysize];
+        Arrays.fill(data, (byte)'x');
+
+        while(lastWarmUpTP < (throughput = warmUp(servers, data, ledgers, ensemble, quorum, passwd, throttle))) {
+            LOG.info("Warmup tp: " + throughput);
+            lastWarmUpTP = throughput;
+            // we will just run once, so lets break
+            break;
+        }
+        LOG.info("Warmup phase finished");
+
+        // Now do the benchmark
+        BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, passwd, throttle, ledgers, servers);
+        bench.setEntryData(data);
+        thread = new Thread(bench);
+        ZooKeeper zk = null;
+
+        if (coordinationZnode != null) {
+            final CountDownLatch connectLatch = new CountDownLatch(1);
+            zk = new ZooKeeper(servers, 15000, new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                        if (event.getState() == KeeperState.SyncConnected) {
+                            connectLatch.countDown();
+                        }
+                    }});
+            if (!connectLatch.await(10, TimeUnit.SECONDS)) {
+                LOG.error("Couldn't connect to zookeeper at " + servers);
+                zk.close();
+                System.exit(-1);
+            }
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            LOG.info("Waiting for " + coordinationZnode);
+            if (zk.exists(coordinationZnode, new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                    if (event.getType() == EventType.NodeCreated) {
+                        latch.countDown();
+                    }
+                }}) != null) {
+                latch.countDown();
+            }
+            latch.await();
+            LOG.info("Coordination znode created");
+        }
+        thread.start();
+        Thread.sleep(totalTime);
+        thread.interrupt();
+        thread.join();
+
+        LOG.info("Calculating percentiles");
+        ArrayList<Long> latency = new ArrayList<Long>();
+        for(int i = 0; i < bench.latencies.length; i++) {
+            if (bench.latencies[i] > 0) {
+                latency.add(bench.latencies[i]);
+            }
+        }
+        double tp = (double)latency.size()*1000.0/(double)bench.getDuration();
+        LOG.info(latency.size() + " completions in " + bench.getDuration() + " seconds: " + tp + " ops/sec");
+
+        if (zk != null) {
+            zk.create(coordinationZnode + "/worker-", 
+                      ("tp " + tp + " duration " + bench.getDuration()).getBytes(), 
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+            zk.close();
+        }
+
+        // dump the latencies for later debugging (it will be sorted by entryid)
+        OutputStream fos = new BufferedOutputStream(new FileOutputStream("latencyDump.dat"));
+
+        for(Long l: latency) {
+            fos.write((Long.toString(l)+"\t"+(l/1000000)+ "ms\n").getBytes());
+        }
+        fos.flush();
+        fos.close();
+
+        // now get the latencies
+        Collections.sort(latency);
+        LOG.info("99th percentile latency: {}", percentile(latency, 99));
+        LOG.info("95th percentile latency: {}", percentile(latency, 95));
+
+        bench.close();
+    }
+
+    private static double percentile(ArrayList<Long> latency, int percentile) {
+        int size = latency.size();
+        int sampleSize = (size * percentile) / 100;
+        long total = 0;
+        int count = 0;
+        for(int i = 0; i < sampleSize; i++) {
+            total += latency.get(i);
+            count++;
+        }
+        return ((double)total/(double)count)/1000000.0;
+    }
+
+    private static long warmUp(String servers, byte[] data,
+                               int ledgers, int ensemble, int qSize, byte[] passwd, int throttle)
+            throws KeeperException, IOException, InterruptedException, BKException {
+        final CountDownLatch connectLatch = new CountDownLatch(1);
+        final int bookies;
+        ZooKeeper zk = null;
+        try {
+            zk = new ZooKeeper(servers, 15000, new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                        if (event.getState() == KeeperState.SyncConnected) {
+                            connectLatch.countDown();
+                        }
+                    }});
+            if (!connectLatch.await(10, TimeUnit.SECONDS)) {
+                LOG.error("Couldn't connect to zookeeper at " + servers);
+                throw new IOException("Couldn't connect to zookeeper " + servers);
+            }
+            bookies = zk.getChildren("/ledgers/available", false).size();
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+
+        BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, passwd,
+                                                                   throttle, ledgers, servers);
+        int limit = 50000;
+
+        warmup.setSendLimit(limit);
+        warmup.setEntryData(data);
+        Thread thread = new Thread(warmup);
+        thread.start();
+        thread.join();
+        warmup.close();
+        return warmup.getThroughput();
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java Fri Mar  9 13:48:20 2012
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.benchmark;
+
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class TestBenchmark {
+    protected static final Logger LOG = LoggerFactory.getLogger(TestBenchmark.class);
+
+    private static Thread ensembleThread = null;
+    private final static String zkString = "localhost:2181";
+    private static List<String> bookies = null;
+
+    @BeforeClass
+    public static void startEnsemble() throws Exception {
+        final int numBookies = 5;
+
+        ensembleThread = new Thread() {
+                public void run() {
+                    try {
+                        LocalBookKeeper.main(new String[]{String.valueOf(numBookies)});
+                    } catch (InterruptedException ie) {
+                        LOG.info("Shutting down ensemble thread");
+                    } catch (Exception e) {
+                        LOG.error("Error running bookkeeper ensemble", e);
+                    }
+                }
+            };
+        ensembleThread.start();
+
+        if (!LocalBookKeeper.waitForServerUp(zkString, 5000)) {
+            throw new Exception("Failed to start zookeeper");
+        }
+        ZooKeeper zk = null;
+        try {
+            final CountDownLatch connectLatch = new CountDownLatch(1);
+
+            zk = new ZooKeeper(zkString, 15000, new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                        if (event.getState() == KeeperState.SyncConnected) {
+                            connectLatch.countDown();
+                        }
+                    }});
+            if (!connectLatch.await(10, TimeUnit.SECONDS)) {
+                LOG.error("Couldn't connect to zookeeper at " + zkString);
+            } else {
+                for (int i = 0; i < 10; i++) {
+                    try {
+                        bookies = zk.getChildren("/ledgers/available", false);
+                        if (zk.getChildren("/ledgers/available", false).size()
+                            == numBookies) {
+                            return;
+                        }
+                    } catch (Exception e) {
+                        // do nothing
+                    }
+                    Thread.sleep(1000);
+                }
+                throw new Exception("Not enough bookies started");
+            }
+        } finally {
+            zk.close();
+        }
+    }
+
+    @AfterClass
+    public static void stopEnsemble() throws Exception {
+        if (ensembleThread != null) {
+            ensembleThread.interrupt();
+            ensembleThread.join();
+        }
+    }
+
+    @Test
+    public void testThroughputLatency() throws Exception {
+        BenchThroughputLatency.main(new String[] {
+                "--time", "10"
+            });
+    }
+
+    @Test
+    public void testBookie() throws Exception {
+        String bookie = bookies.get(0);
+        String[] parts = bookie.split(":");
+        BenchBookie.main(new String[] {
+                "--host", parts[0],
+                "--port", parts[1],
+                "--ledger", "12345"
+                });
+    }
+
+    @Test
+    public void testReadThroughputLatency() throws Exception {
+        AtomicBoolean threwException = new AtomicBoolean(false);
+        Thread t = new Thread() {
+                public void run() {
+                    try {
+                        BenchReadThroughputLatency.main(new String[] {
+                                "--listen", "10"});
+                    } catch (Throwable t) {
+                        LOG.error("Error reading", t);
+                    }
+                }
+            };
+        t.start();
+
+        Thread.sleep(10000);
+        byte data[] = new byte[1024];
+        Arrays.fill(data, (byte)'x');
+
+        long lastLedgerId = 0;
+        Assert.assertTrue("Thread should be running", t.isAlive());
+        for (int i = 0; i < 10; i++) {
+            BookKeeper bk = new BookKeeper(zkString);
+            LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "benchPasswd".getBytes());
+            lastLedgerId = lh.getId();
+            try {
+                for (int j = 0; j < 100; j++) {
+                    lh.addEntry(data);
+                }
+            } finally {
+                lh.close();
+                bk.close();
+            }
+        }
+        for (int i = 0; i < 60; i++) {
+            if (!t.isAlive()) {
+                break;
+            }
+            Thread.sleep(1000); // wait for 10 seconds for reading to finish
+        }
+
+        Assert.assertFalse("Thread should be finished", t.isAlive());
+
+        BenchReadThroughputLatency.main(new String[] {
+                "--ledger", String.valueOf(lastLedgerId)});
+
+        final long nextLedgerId = lastLedgerId+1;
+        t = new Thread() {
+                public void run() {
+                    try {
+                        BenchReadThroughputLatency.main(new String[] {
+                                "--ledger", String.valueOf(nextLedgerId)});
+                    } catch (Throwable t) {
+                        LOG.error("Error reading", t);
+                    }
+                }
+            };
+        t.start();
+
+        Assert.assertTrue("Thread should be running", t.isAlive());
+        BookKeeper bk = new BookKeeper(zkString);
+        LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "benchPasswd".getBytes());
+        try {
+            for (int j = 0; j < 100; j++) {
+                lh.addEntry(data);
+            }
+        } finally {
+            lh.close();
+            bk.close();
+        }
+        for (int i = 0; i < 60; i++) {
+            if (!t.isAlive()) {
+                break;
+            }
+            Thread.sleep(1000); // wait for 10 seconds for reading to finish
+        }
+        Assert.assertFalse("Thread should be finished", t.isAlive());
+    }
+}
\ No newline at end of file

Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties Fri Mar  9 13:48:20 2012
@@ -0,0 +1,72 @@
+#
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# 
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only
+log4j.rootLogger=OFF, CONSOLE
+
+# Example with rolling log file
+#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
+
+# Example with rolling log file and tracing
+#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=bookkeeper-benchmark.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+
+
+#
+# Add TRACEFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
+log4j.appender.TRACEFILE.Threshold=TRACE
+log4j.appender.TRACEFILE.File=bookkeeper_trace.log
+
+log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
+### Notice we are including log4j's NDC here (%x)
+log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper?rev=1298825&r1=1298824&r2=1298825&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper Fri Mar  9 13:48:20 2012
@@ -65,6 +65,7 @@ Environment variables:
    BOOKIE_LOG_CONF        Log4j configuration file
    BOOKIE_CONF            Configuration file (default: conf/bk_server.conf)
    BOOKIE_EXTRA_OPTS      Extra options to be passed to the jvm
+   BOOKIE_EXTRA_CLASSPATH Add extra paths to the bookkeeper classpath
 
 These variable can also be set in conf/bkenv.sh
 EOF
@@ -109,7 +110,7 @@ if [ "$BOOKIE_CONF" == "" ]; then
     BOOKIE_CONF=$DEFAULT_CONF
 fi
 
-BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH"
+BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH:$BOOKIE_EXTRA_CLASSPATH"
 if [ "$BOOKIE_LOG_CONF" != "" ]; then
     BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH"
     OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`"