You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/03/09 14:48:21 UTC
svn commit: r1298825 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-benchmark/ bookkeeper-benchmark/bin/ bookkeeper-benchmark/conf/
bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/
bookkeeper-benchmark/src/test/ bookkeeper-benchmark...
Author: fpj
Date: Fri Mar 9 13:48:20 2012
New Revision: 1298825
URL: http://svn.apache.org/viewvc?rev=1298825&view=rev
Log:
BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml
zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1298825&r1=1298824&r2=1298825&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Mar 9 13:48:20 2012
@@ -84,6 +84,9 @@ Trunk (unreleased changes)
BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
+ bookkeeper-benchmark/
+ BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)
+
Release 4.0.0 - 2011-11-30
Non-backward compatible changes:
Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/bin/benchmark Fri Mar 9 13:48:20 2012
@@ -0,0 +1,131 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# check if net.ipv6.bindv6only is set to 1
+bindv6only=$(/sbin/sysctl -n net.ipv6.bindv6only 2> /dev/null)
+if [ -n "$bindv6only" ] && [ "$bindv6only" -eq "1" ]
+then
+ echo "Error: \"net.ipv6.bindv6only\" is set to 1 - Java networking could be broken"
+ echo "For more info (the following page also applies to bookkeeper): http://wiki.apache.org/hadoop/HadoopIPv6"
+ exit 1
+fi
+
+BINDIR=`dirname "$0"`
+BENCH_HOME=`cd $BINDIR/..;pwd`
+
+RELEASE_JAR=`ls $BENCH_HOME/bookkeeper-benchmark-*.jar 2> /dev/null | tail -1`
+if [ $? == 0 ]; then
+ BENCHMARK_JAR=$RELEASE_JAR
+fi
+
+BUILT_JAR=`ls $BENCH_HOME/target/bookkeeper-benchmark-*.jar 2> /dev/null | tail -1`
+if [ $? != 0 ] && [ ! -e "$BENCHMARK_JAR" ]; then
+ echo "\nCouldn't find benchmark jar.";
+ echo "Make sure you've run 'mvn package'\n";
+ exit 1;
+elif [ -e "$BUILT_JAR" ]; then
+ BENCHMARK_JAR=$BUILT_JAR
+fi
+
+benchmark_help() {
+ cat <<EOF
+Usage: $0 <command>
+where command is one of:
+ writes Benchmark throughput and latency for writes
+ reads Benchmark throughput and latency for reads
+ bookie Benchmark an individual bookie
+ help This help message
+
+use -help with individual commands for more options. For example,
+ $0 writes -help
+
+or command is the full name of a class with a defined main() method.
+
+Environment variables:
+ BENCHMARK_LOG_CONF Log4j configuration file (default: conf/log4j.properties)
+ BENCHMARK_EXTRA_OPTS Extra options to be passed to the jvm
+ BENCHMARK_EXTRA_CLASSPATH Add extra paths to the bookkeeper classpath
+
+EOF
+}
+
+add_maven_deps_to_classpath() {
+ MVN="mvn"
+ if [ "$MAVEN_HOME" != "" ]; then
+ MVN=${MAVEN_HOME}/bin/mvn
+ fi
+
+ # Need to generate classpath from maven pom. This is costly so generate it
+ # and cache it. Save the file into our target dir so a mvn clean will get
+ # clean it up and force us create a new one.
+ f="${BENCH_HOME}/target/cached_classpath.txt"
+ if [ ! -f "${f}" ]
+ then
+ ${MVN} -f "${BENCH_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null
+ fi
+ BENCHMARK_CLASSPATH=${CLASSPATH}:`cat "${f}"`
+}
+
+if [ -d "$BENCH_HOME/lib" ]; then
+ for i in $BENCH_HOME/lib/*.jar; do
+ BENCHMARK_CLASSPATH=$BENCHMARK_CLASSPATH:$i
+ done
+else
+ add_maven_deps_to_classpath
+fi
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+ benchmark_help;
+ exit 1;
+fi
+
+# get arguments
+COMMAND=$1
+shift
+
+BENCHMARK_CLASSPATH="$BENCHMARK_JAR:$BENCHMARK_CLASSPATH:$BENCHMARK_EXTRA_CLASSPATH"
+BENCHMARK_LOG_CONF=${BENCHMARK_LOG_CONF:-$BENCH_HOME/conf/log4j.properties}
+
+if [ "$BENCHMARK_LOG_CONF" != "" ]; then
+ BENCHMARK_CLASSPATH="`dirname $BENCHMARK_LOG_CONF`:$BENCHMARK_CLASSPATH"
+ OPTS="$OPTS -Dlog4j.configuration=`basename $BENCHMARK_LOG_CONF`"
+fi
+OPTS="-cp $BENCHMARK_CLASSPATH $OPTS $BENCHMARK_EXTRA_OPTS"
+
+OPTS="$OPTS $BENCHMARK_EXTRA_OPTS"
+
+# Disable ipv6 as it can cause issues
+OPTS="$OPTS -Djava.net.preferIPv4Stack=true"
+
+if [ $COMMAND == "writes" ]; then
+ exec java $OPTS org.apache.bookkeeper.benchmark.BenchThroughputLatency $@
+elif [ $COMMAND == "reads" ]; then
+ exec java $OPTS org.apache.bookkeeper.benchmark.BenchReadThroughputLatency $@
+elif [ $COMMAND == "bookie" ]; then
+ exec java $OPTS org.apache.bookkeeper.benchmark.BenchBookie $@
+elif [ $COMMAND == "help" ]; then
+ benchmark_help;
+else
+ exec java $OPTS $COMMAND $@
+fi
+
Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/conf/log4j.properties Fri Mar 9 13:48:20 2012
@@ -0,0 +1,73 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only
+log4j.rootLogger=ERROR, CONSOLE
+
+# Example with rolling log file
+#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
+
+# Example with rolling log file and tracing
+#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+log4j.logger.org.apache.bookkeeper.benchmark=INFO
+
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=bookkeeper-benchmark.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+
+
+#
+# Add TRACEFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
+log4j.appender.TRACEFILE.Threshold=TRACE
+log4j.appender.TRACEFILE.File=bookkeeper_trace.log
+
+log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
+### Notice we are including log4j's NDC here (%x)
+log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml?rev=1298825&r1=1298824&r2=1298825&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml Fri Mar 9 13:48:20 2012
@@ -106,5 +106,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
</dependencies>
</project>
Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java Fri Mar 9 13:48:20 2012
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.benchmark;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BenchBookie {
+ static Logger LOG = LoggerFactory.getLogger(BenchBookie.class);
+
+ static class LatencyCallback implements WriteCallback {
+ boolean complete;
+ @Override
+ synchronized public void writeComplete(int rc, long ledgerId, long entryId,
+ InetSocketAddress addr, Object ctx) {
+ if (rc != 0) {
+ LOG.error("Got error " + rc);
+ }
+ complete = true;
+ notifyAll();
+ }
+ synchronized public void resetComplete() {
+ complete = false;
+ }
+ synchronized public void waitForComplete() throws InterruptedException {
+ while(!complete) {
+ wait();
+ }
+ }
+ }
+
+ static class ThroughputCallback implements WriteCallback {
+ int count;
+ int waitingCount = Integer.MAX_VALUE;
+ synchronized public void writeComplete(int rc, long ledgerId, long entryId,
+ InetSocketAddress addr, Object ctx) {
+ if (rc != 0) {
+ LOG.error("Got error " + rc);
+ }
+ count++;
+ if (count >= waitingCount) {
+ notifyAll();
+ }
+ }
+ synchronized public void waitFor(int count) throws InterruptedException {
+ while(this.count < count) {
+ waitingCount = count;
+ wait(1000);
+ }
+ waitingCount = Integer.MAX_VALUE;
+ }
+ }
+
+ /**
+ * @param args
+ * @throws InterruptedException
+ */
+ public static void main(String[] args) throws InterruptedException, ParseException {
+ Options options = new Options();
+ options.addOption("host", true, "Hostname or IP of bookie to benchmark");
+ options.addOption("port", true, "Port of bookie to benchmark (default 3181)");
+ options.addOption("ledger", true, "Ledger Id to write to (default 1)");
+ options.addOption("help", false, "This message");
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("help") || !cmd.hasOption("host")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("BenchBookie <options>", options);
+ System.exit(-1);
+ }
+
+ String addr = cmd.getOptionValue("host");
+ int port = Integer.valueOf(cmd.getOptionValue("port", "3181"));
+ int ledger = Integer.valueOf(cmd.getOptionValue("ledger", "1"));
+
+ ClientSocketChannelFactory channelFactory
+ = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+ .newCachedThreadPool());
+ OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+
+ ClientConfiguration conf = new ClientConfiguration();
+ BookieClient bc = new BookieClient(conf, channelFactory, executor);
+ LatencyCallback lc = new LatencyCallback();
+
+ ThroughputCallback tc = new ThroughputCallback();
+ int warmUpCount = 999;
+ for(long entry = 0; entry < warmUpCount; entry++) {
+ ChannelBuffer toSend = ChannelBuffers.buffer(128);
+ toSend.resetReaderIndex();
+ toSend.resetWriterIndex();
+ toSend.writeLong(ledger);
+ toSend.writeLong(entry);
+ toSend.writerIndex(toSend.capacity());
+ bc.addEntry(new InetSocketAddress(addr, port), ledger, new byte[20],
+ entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+ }
+ LOG.info("Waiting for warmup");
+ tc.waitFor(warmUpCount);
+
+ LOG.info("Benchmarking latency");
+ int entryCount = 5000;
+ long startTime = System.nanoTime();
+ for(long entry = 0; entry < entryCount; entry++) {
+ ChannelBuffer toSend = ChannelBuffers.buffer(128);
+ toSend.resetReaderIndex();
+ toSend.resetWriterIndex();
+ toSend.writeLong(ledger+1);
+ toSend.writeLong(entry);
+ toSend.writerIndex(toSend.capacity());
+ lc.resetComplete();
+ bc.addEntry(new InetSocketAddress(addr, port), ledger+1, new byte[20],
+ entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
+ lc.waitForComplete();
+ }
+ long endTime = System.nanoTime();
+ LOG.info("Latency: " + (((double)(endTime-startTime))/((double)entryCount))/1000000.0);
+
+ entryCount = 50000;
+ LOG.info("Benchmarking throughput");
+ startTime = System.currentTimeMillis();
+ tc = new ThroughputCallback();
+ for(long entry = 0; entry < entryCount; entry++) {
+ ChannelBuffer toSend = ChannelBuffers.buffer(128);
+ toSend.resetReaderIndex();
+ toSend.resetWriterIndex();
+ toSend.writeLong(ledger+2);
+ toSend.writeLong(entry);
+ toSend.writerIndex(toSend.capacity());
+ bc.addEntry(new InetSocketAddress(addr, port), ledger+2, new byte[20],
+ entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+ }
+ tc.waitFor(entryCount);
+ endTime = System.currentTimeMillis();
+ LOG.info("Throughput: " + ((long)entryCount)*1000/(endTime-startTime));
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java Fri Mar 9 13:48:20 2012
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.benchmark;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event;
+
+import java.util.Enumeration;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BenchReadThroughputLatency {
+ static Logger LOG = LoggerFactory.getLogger(BenchReadThroughputLatency.class);
+
+ private static final Pattern LEDGER_PATTERN = Pattern.compile("L([0-9]+)$");
+
+ private static final Comparator<String> ZK_LEDGER_COMPARE = new Comparator<String>() {
+ public int compare(String o1, String o2) {
+ try {
+ Matcher m1 = LEDGER_PATTERN.matcher(o1);
+ Matcher m2 = LEDGER_PATTERN.matcher(o2);
+ if (m1.find() && m2.find()) {
+ return Integer.valueOf(m1.group(1))
+ - Integer.valueOf(m2.group(1));
+ } else {
+ return o1.compareTo(o2);
+ }
+ } catch (Throwable t) {
+ return o1.compareTo(o2);
+ }
+ }
+ };
+
+ private static void readLedger(String zkservers, long ledgerId, byte[] passwd) {
+ LOG.info("Reading ledger {}", ledgerId);
+ BookKeeper bk = null;
+ long time = 0;
+ long entriesRead = 0;
+ long lastRead = 0;
+ int nochange = 0;
+
+ LedgerHandle lh = null;
+ try {
+ bk = new BookKeeper(zkservers);
+ while (true) {
+ lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32,
+ passwd);
+ long lastConfirmed = lh.getLastAddConfirmed();
+ if (lastConfirmed == lastRead) {
+ nochange++;
+ if (nochange == 10) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ continue;
+ }
+ } else {
+ nochange = 0;
+ }
+ long starttime = System.nanoTime();
+
+ Enumeration<LedgerEntry> entries = lh.readEntries(lastRead+1, lastConfirmed);
+ lastRead = lastConfirmed;
+ while (entries.hasMoreElements()) {
+ LedgerEntry e = entries.nextElement();
+ entriesRead++;
+ if ((entriesRead % 10000) == 0) {
+ LOG.info("{} entries read", entriesRead);
+ }
+ }
+ long endtime = System.nanoTime();
+ time += endtime - starttime;
+
+ lh.close();
+ lh = null;
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException ie) {
+ // ignore
+ } catch (Exception e ) {
+ LOG.error("Exception in reader", e);
+ } finally {
+ LOG.info("Read {} in {}ms", entriesRead, time/1000/1000);
+
+ try {
+ if (lh != null) {
+ lh.close();
+ }
+ if (bk != null) {
+ bk.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Exception closing stuff", e);
+ }
+ }
+ }
+
+ private static void usage(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("BenchReadThroughputLatency <options>", options);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. "
+ + " Cannot be used with -listen");
+ options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully");
+ options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')");
+ options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
+ options.addOption("help", false, "This message");
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("help")) {
+ usage(options);
+ System.exit(-1);
+ }
+
+ final String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
+ final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes();
+ if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
+ LOG.error("Cannot used -ledger and -listen together");
+ usage(options);
+ System.exit(-1);
+ }
+
+ final AtomicInteger ledger = new AtomicInteger(0);
+ final AtomicInteger numLedgers = new AtomicInteger(0);
+ if (cmd.hasOption("ledger")) {
+ ledger.set(Integer.valueOf(cmd.getOptionValue("ledger")));
+ } else if (cmd.hasOption("listen")) {
+ numLedgers.set(Integer.valueOf(cmd.getOptionValue("listen")));
+ } else {
+ LOG.error("You must use -ledger or -listen");
+ usage(options);
+ System.exit(-1);
+ }
+
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ final String nodepath = String.format("/ledgers/L%010d", ledger.get());
+
+ final ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher() {
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.SyncConnected
+ && event.getType() == Event.EventType.None) {
+ connectedLatch.countDown();
+ }
+ }
+ });
+ try {
+ zk.register(new Watcher() {
+ public void process(WatchedEvent event) {
+ try {
+ if (event.getState() == Event.KeeperState.SyncConnected
+ && event.getType() == Event.EventType.None) {
+ connectedLatch.countDown();
+ } else if (event.getType() == Event.EventType.NodeCreated
+ && event.getPath().equals(nodepath)) {
+ readLedger(servers, ledger.get(), passwd);
+ shutdownLatch.countDown();
+ } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
+ if (numLedgers.get() < 0) {
+ return;
+ }
+ List<String> children = zk.getChildren("/ledgers", true);
+ List<String> ledgers = new ArrayList<String>();
+ for (String child : children) {
+ if (LEDGER_PATTERN.matcher(child).find()) {
+ ledgers.add(child);
+ }
+ }
+ Collections.sort(ledgers, ZK_LEDGER_COMPARE);
+ String last = ledgers.get(ledgers.size() - 1);
+ final Matcher m = LEDGER_PATTERN.matcher(last);
+ if (m.find()) {
+ int ledgersLeft = numLedgers.decrementAndGet();
+ Thread t = new Thread() {
+ public void run() {
+ readLedger(servers, Long.valueOf(m.group(1)), passwd);
+ }
+ };
+ t.start();
+ if (ledgersLeft <= 0) {
+ shutdownLatch.countDown();
+ }
+ } else {
+ LOG.error("Cant file ledger id in {}", last);
+ }
+ } else {
+ LOG.warn("Unknown event {}", event);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in watcher", e);
+ }
+ }
+ });
+ connectedLatch.await();
+ if (ledger.get() != 0) {
+ if (zk.exists(nodepath, true) != null) {
+ readLedger(servers, ledger.get(), passwd);
+ shutdownLatch.countDown();
+ } else {
+ LOG.info("Watching for creation of" + nodepath);
+ }
+ } else {
+ zk.getChildren("/ledgers", true);
+ }
+ shutdownLatch.await();
+ LOG.info("Shutting down");
+ } finally {
+ zk.close();
+ }
+ }
+}
\ No newline at end of file
Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java Fri Mar 9 13:48:20 2012
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.benchmark;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
+public class BenchThroughputLatency implements AddCallback, Runnable {
+ static Logger LOG = LoggerFactory.getLogger(BenchThroughputLatency.class);
+
+ BookKeeper bk;
+ LedgerHandle lh[];
+ AtomicLong counter;
+
+ Semaphore sem;
+ int pace;
+ int throttle;
+ int numberOfLedgers = 1;
+ final String servers;
+
+ class Context {
+ long localStartTime;
+ long globalStartTime;
+ long id;
+
+ Context(long id, long time){
+ this.id = id;
+ this.localStartTime = this.globalStartTime = time;
+ }
+ }
+
+ public BenchThroughputLatency(int ensemble, int qSize, byte[] passwd,
+ int throttle, int numberOfLedgers, String servers)
+ throws KeeperException, IOException, InterruptedException {
+ this.sem = new Semaphore(throttle);
+ this.pace = pace;
+ this.throttle = throttle;
+
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setThrottleValue(100000);
+ conf.setZkServers(servers);
+ this.servers = servers;
+
+ bk = new BookKeeper(conf);
+ this.counter = new AtomicLong(0);
+ this.numberOfLedgers = numberOfLedgers;
+ try{
+ lh = new LedgerHandle[this.numberOfLedgers];
+
+ for(int i = 0; i < this.numberOfLedgers; i++) {
+ lh[i] = bk.createLedger(ensemble, qSize, BookKeeper.DigestType.CRC32,
+ passwd);
+ LOG.info("Ledger Handle: " + lh[i].getId());
+ }
+ } catch (BKException e) {
+ e.printStackTrace();
+ }
+ }
+
+ Random rand = new Random();
+ public void close() throws InterruptedException, BKException {
+ for(int i = 0; i < numberOfLedgers; i++) {
+ lh[i].close();
+ }
+ bk.close();
+ }
+
+ long previous = 0;
+ byte bytes[];
+
+ void setEntryData(byte data[]) {
+ bytes = data;
+ }
+
+ int lastLedger = 0;
+ private int getRandomLedger() {
+ return rand.nextInt(numberOfLedgers);
+ }
+
+ int sendLimit = 2000000;
+ long latencies[] = new long[sendLimit];
+ int latencyIndex = -1;
+ AtomicLong completedRequests = new AtomicLong(0);
+
+ public void setSendLimit(int sendLimit) {
+ this.sendLimit = sendLimit;
+ latencies = new long[sendLimit];
+ }
+
+ long duration = -1;
+ synchronized public long getDuration() {
+ return duration;
+ }
+
+ public void run() {
+ LOG.info("Running...");
+ long start = previous = System.currentTimeMillis();
+
+ byte messageCount = 0;
+ int sent = 0;
+
+ Thread reporter = new Thread() {
+ public void run() {
+ try {
+ while(true) {
+ Thread.sleep(200);
+ LOG.info("ms: {} req: {}", System.currentTimeMillis(), completedRequests.getAndSet(0));
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Caught interrupted exception, going away");
+ }
+ }
+ };
+ reporter.start();
+ long beforeSend = System.nanoTime();
+
+ while(!Thread.currentThread().isInterrupted() && sent < sendLimit) {
+ try {
+ sem.acquire();
+ if (sent == 10000) {
+ long afterSend = System.nanoTime();
+ long time = afterSend - beforeSend;
+ LOG.info("Time to send first batch: {}s {}ns ",
+ time/1000/1000/1000, time);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ final int index = getRandomLedger();
+ LedgerHandle h = lh[index];
+ if (h == null) {
+ LOG.error("Handle " + index + " is null!");
+ } else {
+ long nanoTime = System.nanoTime();
+ lh[index].asyncAddEntry(bytes, this, new Context(sent, nanoTime));
+ counter.incrementAndGet();
+ }
+ sent++;
+ }
+ LOG.info("Sent: " + sent);
+ try {
+ synchronized (this) {
+ while(this.counter.get() > 0)
+ Thread.sleep(1000);
+ }
+ } catch(InterruptedException e) {
+ LOG.error("Interrupted while waiting", e);
+ }
+ synchronized(this) {
+ duration = System.currentTimeMillis() - start;
+ }
+ throughput = sent*1000/duration;
+
+ reporter.interrupt();
+ try {
+ reporter.join();
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ LOG.info("Finished processing in ms: " + duration + " tp = " + throughput);
+ }
+
+ long throughput = -1;
+ public long getThroughput() {
+ return throughput;
+ }
+
+ long threshold = 20000;
+ long runningAverageCounter = 0;
+ long totalTime = 0;
+ @Override
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ Context context = (Context) ctx;
+
+ // we need to use the id passed in the context in the case of
+ // multiple ledgers, and it works even with one ledger
+ entryId = context.id;
+ long newTime = System.nanoTime() - context.localStartTime;
+
+ sem.release();
+ counter.decrementAndGet();
+
+ latencies[(int)entryId] = newTime;
+
+ completedRequests.incrementAndGet();
+ }
+
+ public static void main(String[] args)
+ throws KeeperException, IOException, InterruptedException, ParseException, BKException {
+ Options options = new Options();
+ options.addOption("time", true, "Running time (seconds), default 60");
+ options.addOption("entrysize", true, "Entry size (bytes), default 1024");
+ options.addOption("ensemble", true, "Ensemble size, default 3");
+ options.addOption("quorum", true, "Quorum size, default 2");
+ options.addOption("throttle", true, "Max outstanding requests, default 10000");
+ options.addOption("ledgers", true, "Number of ledgers, default 1");
+ options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
+ options.addOption("password", true, "Password used to create ledgers (default 'benchPasswd')");
+ options.addOption("coord_node", true, "Coordination znode for multi client benchmarks (optional)");
+ options.addOption("help", false, "This message");
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("help")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("BenchThroughputLatency <options>", options);
+ System.exit(-1);
+ }
+
+ long runningTime = Long.valueOf(cmd.getOptionValue("time", "60"));
+ String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
+ int entrysize = Integer.valueOf(cmd.getOptionValue("entrysize", "1024"));
+
+ int ledgers = Integer.valueOf(cmd.getOptionValue("ledgers", "1"));
+ int ensemble = Integer.valueOf(cmd.getOptionValue("ensemble", "3"));
+ int quorum = Integer.valueOf(cmd.getOptionValue("quorum", "2"));
+ int throttle = Integer.valueOf(cmd.getOptionValue("throttle", "10000"));
+
+ String coordinationZnode = cmd.getOptionValue("coord_node");
+ final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes();
+
+ LOG.warn("(Parameters received) running time: " + runningTime +
+ ", entry size: " + entrysize + ", ensemble size: " + ensemble +
+ ", quorum size: " + quorum +
+ ", throttle: " + throttle +
+ ", number of ledgers: " + ledgers +
+ ", zk servers: " + servers);
+
+ long totalTime = runningTime*1000;
+
+ // Do a warmup run
+ Thread thread;
+
+ long lastWarmUpTP = -1;
+ long throughput;
+ LOG.info("Starting warmup");
+ byte data[] = new byte[entrysize];
+ Arrays.fill(data, (byte)'x');
+
+ while(lastWarmUpTP < (throughput = warmUp(servers, data, ledgers, ensemble, quorum, passwd, throttle))) {
+ LOG.info("Warmup tp: " + throughput);
+ lastWarmUpTP = throughput;
+ // we will just run once, so lets break
+ break;
+ }
+ LOG.info("Warmup phase finished");
+
+ // Now do the benchmark
+ BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, passwd, throttle, ledgers, servers);
+ bench.setEntryData(data);
+ thread = new Thread(bench);
+ ZooKeeper zk = null;
+
+ if (coordinationZnode != null) {
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+ zk = new ZooKeeper(servers, 15000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected) {
+ connectLatch.countDown();
+ }
+ }});
+ if (!connectLatch.await(10, TimeUnit.SECONDS)) {
+ LOG.error("Couldn't connect to zookeeper at " + servers);
+ zk.close();
+ System.exit(-1);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ LOG.info("Waiting for " + coordinationZnode);
+ if (zk.exists(coordinationZnode, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == EventType.NodeCreated) {
+ latch.countDown();
+ }
+ }}) != null) {
+ latch.countDown();
+ }
+ latch.await();
+ LOG.info("Coordination znode created");
+ }
+ thread.start();
+ Thread.sleep(totalTime);
+ thread.interrupt();
+ thread.join();
+
+ LOG.info("Calculating percentiles");
+ ArrayList<Long> latency = new ArrayList<Long>();
+ for(int i = 0; i < bench.latencies.length; i++) {
+ if (bench.latencies[i] > 0) {
+ latency.add(bench.latencies[i]);
+ }
+ }
+ double tp = (double)latency.size()*1000.0/(double)bench.getDuration();
+ LOG.info(latency.size() + " completions in " + bench.getDuration() + " seconds: " + tp + " ops/sec");
+
+ if (zk != null) {
+ zk.create(coordinationZnode + "/worker-",
+ ("tp " + tp + " duration " + bench.getDuration()).getBytes(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+ zk.close();
+ }
+
+ // dump the latencies for later debugging (it will be sorted by entryid)
+ OutputStream fos = new BufferedOutputStream(new FileOutputStream("latencyDump.dat"));
+
+ for(Long l: latency) {
+ fos.write((Long.toString(l)+"\t"+(l/1000000)+ "ms\n").getBytes());
+ }
+ fos.flush();
+ fos.close();
+
+ // now get the latencies
+ Collections.sort(latency);
+ LOG.info("99th percentile latency: {}", percentile(latency, 99));
+ LOG.info("95th percentile latency: {}", percentile(latency, 95));
+
+ bench.close();
+ }
+
+ private static double percentile(ArrayList<Long> latency, int percentile) {
+ int size = latency.size();
+ int sampleSize = (size * percentile) / 100;
+ long total = 0;
+ int count = 0;
+ for(int i = 0; i < sampleSize; i++) {
+ total += latency.get(i);
+ count++;
+ }
+ return ((double)total/(double)count)/1000000.0;
+ }
+
+ private static long warmUp(String servers, byte[] data,
+ int ledgers, int ensemble, int qSize, byte[] passwd, int throttle)
+ throws KeeperException, IOException, InterruptedException, BKException {
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+ final int bookies;
+ ZooKeeper zk = null;
+ try {
+ zk = new ZooKeeper(servers, 15000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected) {
+ connectLatch.countDown();
+ }
+ }});
+ if (!connectLatch.await(10, TimeUnit.SECONDS)) {
+ LOG.error("Couldn't connect to zookeeper at " + servers);
+ throw new IOException("Couldn't connect to zookeeper " + servers);
+ }
+ bookies = zk.getChildren("/ledgers/available", false).size();
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, passwd,
+ throttle, ledgers, servers);
+ int limit = 50000;
+
+ warmup.setSendLimit(limit);
+ warmup.setEntryData(data);
+ Thread thread = new Thread(warmup);
+ thread.start();
+ thread.join();
+ warmup.close();
+ return warmup.getThroughput();
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java Fri Mar 9 13:48:20 2012
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.benchmark;
+
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class TestBenchmark {
+ protected static final Logger LOG = LoggerFactory.getLogger(TestBenchmark.class);
+
+ private static Thread ensembleThread = null;
+ private final static String zkString = "localhost:2181";
+ private static List<String> bookies = null;
+
+ @BeforeClass
+ public static void startEnsemble() throws Exception {
+ final int numBookies = 5;
+
+ ensembleThread = new Thread() {
+ public void run() {
+ try {
+ LocalBookKeeper.main(new String[]{String.valueOf(numBookies)});
+ } catch (InterruptedException ie) {
+ LOG.info("Shutting down ensemble thread");
+ } catch (Exception e) {
+ LOG.error("Error running bookkeeper ensemble", e);
+ }
+ }
+ };
+ ensembleThread.start();
+
+ if (!LocalBookKeeper.waitForServerUp(zkString, 5000)) {
+ throw new Exception("Failed to start zookeeper");
+ }
+ ZooKeeper zk = null;
+ try {
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+
+ zk = new ZooKeeper(zkString, 15000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected) {
+ connectLatch.countDown();
+ }
+ }});
+ if (!connectLatch.await(10, TimeUnit.SECONDS)) {
+ LOG.error("Couldn't connect to zookeeper at " + zkString);
+ } else {
+ for (int i = 0; i < 10; i++) {
+ try {
+ bookies = zk.getChildren("/ledgers/available", false);
+ if (zk.getChildren("/ledgers/available", false).size()
+ == numBookies) {
+ return;
+ }
+ } catch (Exception e) {
+ // do nothing
+ }
+ Thread.sleep(1000);
+ }
+ throw new Exception("Not enough bookies started");
+ }
+ } finally {
+ zk.close();
+ }
+ }
+
+ @AfterClass
+ public static void stopEnsemble() throws Exception {
+ if (ensembleThread != null) {
+ ensembleThread.interrupt();
+ ensembleThread.join();
+ }
+ }
+
+ @Test
+ public void testThroughputLatency() throws Exception {
+ BenchThroughputLatency.main(new String[] {
+ "--time", "10"
+ });
+ }
+
+ @Test
+ public void testBookie() throws Exception {
+ String bookie = bookies.get(0);
+ String[] parts = bookie.split(":");
+ BenchBookie.main(new String[] {
+ "--host", parts[0],
+ "--port", parts[1],
+ "--ledger", "12345"
+ });
+ }
+
+ @Test
+ public void testReadThroughputLatency() throws Exception {
+ AtomicBoolean threwException = new AtomicBoolean(false);
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ BenchReadThroughputLatency.main(new String[] {
+ "--listen", "10"});
+ } catch (Throwable t) {
+ LOG.error("Error reading", t);
+ }
+ }
+ };
+ t.start();
+
+ Thread.sleep(10000);
+ byte data[] = new byte[1024];
+ Arrays.fill(data, (byte)'x');
+
+ long lastLedgerId = 0;
+ Assert.assertTrue("Thread should be running", t.isAlive());
+ for (int i = 0; i < 10; i++) {
+ BookKeeper bk = new BookKeeper(zkString);
+ LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "benchPasswd".getBytes());
+ lastLedgerId = lh.getId();
+ try {
+ for (int j = 0; j < 100; j++) {
+ lh.addEntry(data);
+ }
+ } finally {
+ lh.close();
+ bk.close();
+ }
+ }
+ for (int i = 0; i < 60; i++) {
+ if (!t.isAlive()) {
+ break;
+ }
+ Thread.sleep(1000); // wait for 10 seconds for reading to finish
+ }
+
+ Assert.assertFalse("Thread should be finished", t.isAlive());
+
+ BenchReadThroughputLatency.main(new String[] {
+ "--ledger", String.valueOf(lastLedgerId)});
+
+ final long nextLedgerId = lastLedgerId+1;
+ t = new Thread() {
+ public void run() {
+ try {
+ BenchReadThroughputLatency.main(new String[] {
+ "--ledger", String.valueOf(nextLedgerId)});
+ } catch (Throwable t) {
+ LOG.error("Error reading", t);
+ }
+ }
+ };
+ t.start();
+
+ Assert.assertTrue("Thread should be running", t.isAlive());
+ BookKeeper bk = new BookKeeper(zkString);
+ LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "benchPasswd".getBytes());
+ try {
+ for (int j = 0; j < 100; j++) {
+ lh.addEntry(data);
+ }
+ } finally {
+ lh.close();
+ bk.close();
+ }
+ for (int i = 0; i < 60; i++) {
+ if (!t.isAlive()) {
+ break;
+ }
+ Thread.sleep(1000); // wait for 10 seconds for reading to finish
+ }
+ Assert.assertFalse("Thread should be finished", t.isAlive());
+ }
+}
\ No newline at end of file
Added: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties?rev=1298825&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/resources/log4j.properties Fri Mar 9 13:48:20 2012
@@ -0,0 +1,72 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only
+log4j.rootLogger=OFF, CONSOLE
+
+# Example with rolling log file
+#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
+
+# Example with rolling log file and tracing
+#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=bookkeeper-benchmark.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+
+
+#
+# Add TRACEFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
+log4j.appender.TRACEFILE.Threshold=TRACE
+log4j.appender.TRACEFILE.File=bookkeeper_trace.log
+
+log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
+### Notice we are including log4j's NDC here (%x)
+log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper?rev=1298825&r1=1298824&r2=1298825&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper Fri Mar 9 13:48:20 2012
@@ -65,6 +65,7 @@ Environment variables:
BOOKIE_LOG_CONF Log4j configuration file
BOOKIE_CONF Configuration file (default: conf/bk_server.conf)
BOOKIE_EXTRA_OPTS Extra options to be passed to the jvm
+ BOOKIE_EXTRA_CLASSPATH Add extra paths to the bookkeeper classpath
These variable can also be set in conf/bkenv.sh
EOF
@@ -109,7 +110,7 @@ if [ "$BOOKIE_CONF" == "" ]; then
BOOKIE_CONF=$DEFAULT_CONF
fi
-BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH"
+BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH:$BOOKIE_EXTRA_CLASSPATH"
if [ "$BOOKIE_LOG_CONF" != "" ]; then
BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH"
OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`"