You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/05/09 13:20:54 UTC

svn commit: r1336100 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/ hedwig-client/src/main/java/org/apac...

Author: fpj
Date: Wed May  9 11:20:54 2012
New Revision: 1336100

URL: http://svn.apache.org/viewvc?rev=1336100&view=rev
Log:
BOOKKEEPER-236: Benchmarking improvements from latest round of benchmarking (ivank via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed May  9 11:20:54 2012
@@ -173,6 +173,8 @@ Trunk (unreleased changes)
       bookkeeper-benchmark/
 	BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)
 
+	BOOKKEEPER-236: Benchmarking improvements from latest round of benchmarking (ivank via fpj)
+
 Release 4.0.0 - 2011-11-30
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml Wed May  9 11:20:54 2012
@@ -50,6 +50,18 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-common</artifactId>
+       <version>0.23.1</version>
+       <scope>compile</scope>
+    </dependency>
+    <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-hdfs</artifactId>
+       <version>0.23.1</version>
+       <scope>compile</scope>
+    </dependency>
+    <dependency>
     	<groupId>org.apache.zookeeper</groupId>
     	<artifactId>zookeeper</artifactId>
     	<version>3.4.0</version>

Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java Wed May  9 11:20:54 2012
@@ -122,7 +122,8 @@ public class BenchBookie {
         Options options = new Options();
         options.addOption("host", true, "Hostname or IP of bookie to benchmark");
         options.addOption("port", true, "Port of bookie to benchmark (default 3181)");
-        options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
+        options.addOption("zookeeper", true, "Zookeeper ensemble, (default \"localhost:2181\")");
+        options.addOption("size", true, "Size of message to send, in bytes (default 1024)");
         options.addOption("help", false, "This message");
 
         CommandLineParser parser = new PosixParser();
@@ -136,6 +137,7 @@ public class BenchBookie {
 
         String addr = cmd.getOptionValue("host");
         int port = Integer.valueOf(cmd.getOptionValue("port", "3181"));
+        int size = Integer.valueOf(cmd.getOptionValue("size", "1024"));
         String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
 
 
@@ -154,7 +156,7 @@ public class BenchBookie {
 
         long ledger = getValidLedgerId(servers);
         for(long entry = 0; entry < warmUpCount; entry++) {
-            ChannelBuffer toSend = ChannelBuffers.buffer(128);
+            ChannelBuffer toSend = ChannelBuffers.buffer(size);
             toSend.resetReaderIndex();
             toSend.resetWriterIndex();
             toSend.writeLong(ledger);
@@ -171,7 +173,7 @@ public class BenchBookie {
         int entryCount = 5000;
         long startTime = System.nanoTime();
         for(long entry = 0; entry < entryCount; entry++) {
-            ChannelBuffer toSend = ChannelBuffers.buffer(128);
+            ChannelBuffer toSend = ChannelBuffers.buffer(size);
             toSend.resetReaderIndex();
             toSend.resetWriterIndex();
             toSend.writeLong(ledger);
@@ -192,7 +194,7 @@ public class BenchBookie {
         startTime = System.currentTimeMillis();
         tc = new ThroughputCallback();
         for(long entry = 0; entry < entryCount; entry++) {
-            ChannelBuffer toSend = ChannelBuffers.buffer(128);
+            ChannelBuffer toSend = ChannelBuffers.buffer(size);
             toSend.resetReaderIndex();
             toSend.resetWriterIndex();
             toSend.writeLong(ledger);
@@ -204,6 +206,10 @@ public class BenchBookie {
         tc.waitFor(entryCount);
         endTime = System.currentTimeMillis();
         LOG.info("Throughput: " + ((long)entryCount)*1000/(endTime-startTime));
+
+        bc.close();
+        channelFactory.releaseExternalResources();
+        executor.shutdown();
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java Wed May  9 11:20:54 2012
@@ -75,7 +75,7 @@ public class BenchReadThroughputLatency 
         }
     };
 
-    private static void readLedger(String zkservers, long ledgerId, byte[] passwd) {
+    private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] passwd) {
         LOG.info("Reading ledger {}", ledgerId);
         BookKeeper bk = null;
         long time = 0;
@@ -83,13 +83,14 @@ public class BenchReadThroughputLatency 
         long lastRead = 0;
         int nochange = 0;
 
+        long absoluteLimit = 5000000;
         LedgerHandle lh = null;
         try {
-            bk = new BookKeeper(zkservers);
+            bk = new BookKeeper(conf);
             while (true) {
                 lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, 
                                              passwd);
-                long lastConfirmed = lh.getLastAddConfirmed();
+                long lastConfirmed = Math.min(lh.getLastAddConfirmed(), absoluteLimit);
                 if (lastConfirmed == lastRead) {
                     nochange++;
                     if (nochange == 10) {
@@ -103,13 +104,17 @@ public class BenchReadThroughputLatency 
                 }
                 long starttime = System.nanoTime();
 
-                Enumeration<LedgerEntry> entries = lh.readEntries(lastRead+1, lastConfirmed);
-                lastRead = lastConfirmed;
-                while (entries.hasMoreElements()) {
-                    LedgerEntry e = entries.nextElement();
-                    entriesRead++;
-                    if ((entriesRead % 10000) == 0) {
-                        LOG.info("{} entries read", entriesRead);
+                while (lastRead < lastConfirmed) {
+                    long nextLimit = lastRead + 100000;
+                    long readTo = Math.min(nextLimit, lastConfirmed);
+                    Enumeration<LedgerEntry> entries = lh.readEntries(lastRead+1, readTo);
+                    lastRead = readTo;
+                    while (entries.hasMoreElements()) {
+                        LedgerEntry e = entries.nextElement();
+                        entriesRead++;
+                        if ((entriesRead % 10000) == 0) {
+                            LOG.info("{} entries read", entriesRead);
+                        }
                     }
                 }
                 long endtime = System.nanoTime();
@@ -151,6 +156,7 @@ public class BenchReadThroughputLatency 
         options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully");
         options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')");
         options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
+        options.addOption("sockettimeout", true, "Socket timeout for bookkeeper client. In seconds. Default 5");
         options.addOption("help", false, "This message");
 
         CommandLineParser parser = new PosixParser();
@@ -163,6 +169,7 @@ public class BenchReadThroughputLatency 
 
         final String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
         final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes();
+        final int sockTimeout = Integer.valueOf(cmd.getOptionValue("sockettimeout", "5"));
         if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
             LOG.error("Cannot used -ledger and -listen together");
             usage(options);
@@ -185,6 +192,10 @@ public class BenchReadThroughputLatency 
         final CountDownLatch connectedLatch = new CountDownLatch(1);
         final String nodepath = String.format("/ledgers/L%010d", ledger.get());
 
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setReadTimeout(sockTimeout).setZkServers(servers);
+
+
         final ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher() {
                 public void process(WatchedEvent event) {
                     if (event.getState() == Event.KeeperState.SyncConnected
@@ -202,7 +213,7 @@ public class BenchReadThroughputLatency 
                                 connectedLatch.countDown();
                             } else if (event.getType() == Event.EventType.NodeCreated
                                        && event.getPath().equals(nodepath)) {
-                                readLedger(servers, ledger.get(), passwd);
+                                readLedger(conf, ledger.get(), passwd);
                                 shutdownLatch.countDown();
                             } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
                                 if (numLedgers.get() < 0) {
@@ -222,7 +233,7 @@ public class BenchReadThroughputLatency 
                                     int ledgersLeft = numLedgers.decrementAndGet();
                                     Thread t = new Thread() {
                                             public void run() {
-                                                readLedger(servers, Long.valueOf(m.group(1)), passwd);
+                                                readLedger(conf, Long.valueOf(m.group(1)), passwd);
                                             }
                                         };
                                     t.start();
@@ -243,7 +254,7 @@ public class BenchReadThroughputLatency 
             connectedLatch.await();
             if (ledger.get() != 0) {
                 if (zk.exists(nodepath, true) != null) {
-                    readLedger(servers, ledger.get(), passwd);
+                    readLedger(conf, ledger.get(), passwd);
                     shutdownLatch.countDown();
                 } else {
                     LOG.info("Watching for creation of" + nodepath);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java Wed May  9 11:20:54 2012
@@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -62,10 +64,9 @@ public class BenchThroughputLatency impl
     AtomicLong counter;
 
     Semaphore sem;
-    int pace;
-    int throttle;
     int numberOfLedgers = 1;
-    final String servers;
+    final int sendLimit;
+    final long latencies[];
 
     static class Context {
         long localStartTime;
@@ -78,24 +79,19 @@ public class BenchThroughputLatency impl
     }
 
     public BenchThroughputLatency(int ensemble, int qSize, byte[] passwd,
-                                  int throttle, int numberOfLedgers, String servers) 
+            int numberOfLedgers, int sendLimit, ClientConfiguration conf)
             throws KeeperException, IOException, InterruptedException {
-        this.sem = new Semaphore(throttle);
-        this.throttle = throttle;
-
-        ClientConfiguration conf = new ClientConfiguration();
-        conf.setThrottleValue(100000);
-        conf.setZkServers(servers);
-        this.servers = servers;
-
+        this.sem = new Semaphore(conf.getThrottleValue());
         bk = new BookKeeper(conf);
         this.counter = new AtomicLong(0);
         this.numberOfLedgers = numberOfLedgers;
+        this.sendLimit = sendLimit;
+        this.latencies = new long[sendLimit];
         try{
             lh = new LedgerHandle[this.numberOfLedgers];
 
             for(int i = 0; i < this.numberOfLedgers; i++) {
-                lh[i] = bk.createLedger(ensemble, qSize, BookKeeper.DigestType.CRC32, 
+                lh[i] = bk.createLedger(ensemble, qSize, BookKeeper.DigestType.CRC32,
                                         passwd);
                 LOG.info("Ledger Handle: " + lh[i].getId());
             }
@@ -124,16 +120,9 @@ public class BenchThroughputLatency impl
          return rand.nextInt(numberOfLedgers);
     }
 
-    int sendLimit = 2000000;
-    long latencies[] = new long[sendLimit];
     int latencyIndex = -1;
     AtomicLong completedRequests = new AtomicLong(0);
 
-    public void setSendLimit(int sendLimit) {
-        this.sendLimit = sendLimit;
-        latencies = new long[sendLimit];
-    }
-
     long duration = -1;
     synchronized public long getDuration() {
         return duration;
@@ -149,7 +138,7 @@ public class BenchThroughputLatency impl
                 public void run() {
                     try {
                         while(true) {
-                            Thread.sleep(200);
+                            Thread.sleep(1000);
                             LOG.info("ms: {} req: {}", System.currentTimeMillis(), completedRequests.getAndSet(0));
                         }
                     } catch (InterruptedException ie) {
@@ -186,9 +175,12 @@ public class BenchThroughputLatency impl
         }
         LOG.info("Sent: "  + sent);
         try {
-            synchronized (this) {
-                while (this.counter.get() > 0) {
-                    waitFor(1000);
+            int i = 0;
+            while(this.counter.get() > 0) {
+                Thread.sleep(1000);
+                i++;
+                if (i > 30) {
+                    break;
                 }
             }
         } catch(InterruptedException e) {
@@ -208,10 +200,6 @@ public class BenchThroughputLatency impl
         LOG.info("Finished processing in ms: " + getDuration() + " tp = " + throughput);
     }
 
-    private void waitFor(int waitTime) throws InterruptedException {
-        Thread.sleep(waitTime);
-    }
-
     long throughput = -1;
     public long getThroughput() {
         return throughput;
@@ -232,9 +220,10 @@ public class BenchThroughputLatency impl
         sem.release();
         counter.decrementAndGet();
 
-        latencies[(int)entryId] = newTime;
-
-        completedRequests.incrementAndGet();
+        if (rc == 0) {
+            latencies[(int)entryId] = newTime;
+            completedRequests.incrementAndGet();
+        }
     }
 
     public static void main(String[] args)
@@ -248,7 +237,11 @@ public class BenchThroughputLatency impl
         options.addOption("ledgers", true, "Number of ledgers, default 1");
         options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
         options.addOption("password", true, "Password used to create ledgers (default 'benchPasswd')");
-        options.addOption("coord_node", true, "Coordination znode for multi client benchmarks (optional)");
+        options.addOption("coordnode", true, "Coordination znode for multi client benchmarks (optional)");
+        options.addOption("timeout", true, "Number of seconds after which to give up");
+        options.addOption("sockettimeout", true, "Socket timeout for bookkeeper client. In seconds. Default 5");
+        options.addOption("skipwarmup", false, "Skip warm up, default false");
+        options.addOption("sendlimit", true, "Max number of entries to send. Default 20000000");
         options.addOption("help", false, "This message");
 
         CommandLineParser parser = new PosixParser();
@@ -268,10 +261,25 @@ public class BenchThroughputLatency impl
         int ensemble = Integer.valueOf(cmd.getOptionValue("ensemble", "3"));
         int quorum = Integer.valueOf(cmd.getOptionValue("quorum", "2"));
         int throttle = Integer.valueOf(cmd.getOptionValue("throttle", "10000"));
+        int sendLimit = Integer.valueOf(cmd.getOptionValue("sendlimit", "20000000"));
+
+        final int sockTimeout = Integer.valueOf(cmd.getOptionValue("sockettimeout", "5"));
 
-        String coordinationZnode = cmd.getOptionValue("coord_node");
+        String coordinationZnode = cmd.getOptionValue("coordnode");
         final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes();
 
+        Timer timeouter = new Timer();
+        if (cmd.hasOption("timeout")) {
+            final long timeout = Long.valueOf(cmd.getOptionValue("timeout", "360")) * 1000;
+
+            timeouter.schedule(new TimerTask() {
+                    public void run() {
+                        System.err.println("Timing out benchmark after " + timeout + "ms");
+                        System.exit(-1);
+                    }
+                }, timeout);
+        }
+
         LOG.warn("(Parameters received) running time: " + runningTime +
                 ", entry size: " + entrysize + ", ensemble size: " + ensemble +
                 ", quorum size: " + quorum +
@@ -284,19 +292,25 @@ public class BenchThroughputLatency impl
         // Do a warmup run
         Thread thread;
 
-        long lastWarmUpTP = -1;
-        long throughput;
-        LOG.info("Starting warmup");
         byte data[] = new byte[entrysize];
         Arrays.fill(data, (byte)'x');
 
-        if(lastWarmUpTP < (throughput = warmUp(servers, data, ledgers, ensemble, quorum, passwd, throttle))) {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setThrottleValue(throttle).setReadTimeout(sockTimeout).setZkServers(servers);
+
+        if (!cmd.hasOption("skipwarmup")) {
+            long throughput;
+            LOG.info("Starting warmup");
+
+            throughput = warmUp(data, ledgers, ensemble, quorum, passwd, conf);
             LOG.info("Warmup tp: " + throughput);
+            LOG.info("Warmup phase finished");
         }
-        LOG.info("Warmup phase finished");
+
 
         // Now do the benchmark
-        BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, passwd, throttle, ledgers, servers);
+        BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, passwd,
+                                                                  ledgers, sendLimit, conf);
         bench.setEntryData(data);
         thread = new Thread(bench);
         ZooKeeper zk = null;
@@ -336,18 +350,31 @@ public class BenchThroughputLatency impl
         thread.join();
 
         LOG.info("Calculating percentiles");
-        ArrayList<Long> latency = new ArrayList<Long>();
+
+        int numlat = 0;
         for(int i = 0; i < bench.latencies.length; i++) {
             if (bench.latencies[i] > 0) {
-                latency.add(bench.latencies[i]);
+                numlat++;
             }
         }
-        double tp = (double)latency.size()*1000.0/(double)bench.getDuration();
-        LOG.info(latency.size() + " completions in " + bench.getDuration() + " seconds: " + tp + " ops/sec");
+        int numcompletions = numlat;
+        numlat = Math.min(bench.sendLimit, numlat);
+        long[] latency = new long[numlat];
+        int j =0;
+        for(int i = 0; i < bench.latencies.length && j < numlat; i++) {
+            if (bench.latencies[i] > 0) {
+                latency[j++] = bench.latencies[i];
+            }
+        }
+        Arrays.sort(latency);
+
+        long tp = (long)((double)(numcompletions*1000.0)/(double)bench.getDuration());
+
+        LOG.info(numcompletions + " completions in " + bench.getDuration() + " seconds: " + tp + " ops/sec");
 
         if (zk != null) {
-            zk.create(coordinationZnode + "/worker-", 
-                      ("tp " + tp + " duration " + bench.getDuration()).getBytes(), 
+            zk.create(coordinationZnode + "/worker-",
+                      ("tp " + tp + " duration " + bench.getDuration()).getBytes(),
                       ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
             zk.close();
         }
@@ -362,32 +389,33 @@ public class BenchThroughputLatency impl
         fos.close();
 
         // now get the latencies
-        Collections.sort(latency);
         LOG.info("99th percentile latency: {}", percentile(latency, 99));
         LOG.info("95th percentile latency: {}", percentile(latency, 95));
 
         bench.close();
+        timeouter.cancel();
     }
 
-    private static double percentile(ArrayList<Long> latency, int percentile) {
-        int size = latency.size();
+    private static double percentile(long[] latency, int percentile) {
+        int size = latency.length;
         int sampleSize = (size * percentile) / 100;
         long total = 0;
         int count = 0;
         for(int i = 0; i < sampleSize; i++) {
-            total += latency.get(i);
+            total += latency[i];
             count++;
         }
         return ((double)total/(double)count)/1000000.0;
     }
 
-    private static long warmUp(String servers, byte[] data,
-                               int ledgers, int ensemble, int qSize, byte[] passwd, int throttle)
+    private static long warmUp(byte[] data, int ledgers, int ensemble, int qSize,
+                               byte[] passwd, ClientConfiguration conf)
             throws KeeperException, IOException, InterruptedException, BKException {
         final CountDownLatch connectLatch = new CountDownLatch(1);
         final int bookies;
         ZooKeeper zk = null;
         try {
+            final String servers = conf.getZkServers();
             zk = new ZooKeeper(servers, 15000, new Watcher() {
                     @Override
                     public void process(WatchedEvent event) {
@@ -407,10 +435,7 @@ public class BenchThroughputLatency impl
         }
 
         BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, passwd,
-                                                                   throttle, ledgers, servers);
-        int limit = 50000;
-
-        warmup.setSendLimit(limit);
+                ledgers, 50000, conf);
         warmup.setEntryData(data);
         Thread thread = new Thread(warmup);
         thread.start();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java Wed May  9 11:20:54 2012
@@ -23,12 +23,27 @@ package org.apache.bookkeeper.benchmark;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import java.util.concurrent.Future;;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -38,84 +53,29 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.zookeeper.KeeperException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
+
 /**
  * This is a simple test program to compare the performance of writing to
  * BookKeeper and to the local file system.
  *
  */
 
-public class TestClient
-    implements AddCallback, ReadCallback {
+public class TestClient {
     private static final Logger LOG = LoggerFactory.getLogger(TestClient.class);
 
-    BookKeeper x;
-    LedgerHandle lh;
-    Integer entryId;
-    HashMap<Integer, Integer> map;
-
-    FileOutputStream fStream;
-    FileOutputStream fStreamLocal;
-    long start, lastId;
-
-    public TestClient() {
-        entryId = 0;
-        map = new HashMap<Integer, Integer>();
-    }
-
-    public TestClient(String servers) throws KeeperException, IOException, InterruptedException {
-        this();
-        x = new BookKeeper(servers);
-        try {
-            lh = x.createLedger(DigestType.MAC, new byte[] {'a', 'b'});
-        } catch (BKException e) {
-            LOG.error(e.toString());
-        }
-    }
-
-    public TestClient(String servers, int ensSize, int qSize)
-            throws KeeperException, IOException, InterruptedException {
-        this();
-        x = new BookKeeper(servers);
-        try {
-            lh = x.createLedger(ensSize, qSize, DigestType.MAC, new byte[] {'a', 'b'});
-        } catch (BKException e) {
-            LOG.error(e.toString());
-        }
-    }
-
-    public TestClient(FileOutputStream fStream)
-            throws FileNotFoundException {
-        this.fStream = fStream;
-        this.fStreamLocal = new FileOutputStream("./local.log");
-    }
-
-
-    public Integer getFreshEntryId(int val) {
-        ++this.entryId;
-        synchronized (map) {
-            map.put(this.entryId, val);
-        }
-        return this.entryId;
-    }
-
-    public boolean removeEntryId(Integer id) {
-        boolean retVal = false;
-        synchronized (map) {
-            map.remove(id);
-            retVal = true;
-
-            if(map.size() == 0) map.notifyAll();
-            else {
-                if(map.size() < 4)
-                    LOG.error(map.toString());
-            }
-        }
-        return retVal;
-    }
-
-    public void closeHandle() throws KeeperException, InterruptedException, BKException {
-        lh.close();
-    }
     /**
      * First says if entries should be written to BookKeeper (0) or to the local
      * disk (1). Second parameter is an integer defining the length of a ledger entry.
@@ -123,123 +83,286 @@ public class TestClient
      *
      * @param args
      */
-    public static void main(String[] args) {
+    public static void main(String[] args) throws ParseException {
+        Options options = new Options();
+        options.addOption("length", true, "Length of packets being written. Default 1024");
+        options.addOption("target", true, "Target medium to write to. Options are bk, fs & hdfs. Default fs");
+        options.addOption("runfor", true, "Number of seconds to run for. Default 60");
+        options.addOption("path", true, "Path to write to. fs & hdfs only. Default /foobar");
+        options.addOption("zkservers", true, "ZooKeeper servers, comma separated. bk only. Default localhost:2181.");
+        options.addOption("bkensemble", true, "BookKeeper ledger ensemble size. bk only. Default 3");
+        options.addOption("bkquorum", true, "BookKeeper ledger quorum size. bk only. Default 2");
+        options.addOption("bkthrottle", true, "BookKeeper throttle size. bk only. Default 10000");
+        options.addOption("sync", false, "Use synchronous writes with BookKeeper. bk only.");
+        options.addOption("numconcurrent", true, "Number of concurrently clients. Default 1");
+        options.addOption("timeout", true, "Number of seconds after which to give up");
+        options.addOption("help", false, "This message");
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmd = parser.parse(options, args);
+
+        if (cmd.hasOption("help")) {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("TestClient <options>", options);
+            System.exit(-1);
+        }
+
+        int length = Integer.valueOf(cmd.getOptionValue("length", "1024"));
+        String target = cmd.getOptionValue("target", "fs");
+        long runfor = Long.valueOf(cmd.getOptionValue("runfor", "60")) * 1000;
 
-        int lenght = Integer.parseInt(args[1]);
         StringBuilder sb = new StringBuilder();
-        while(lenght-- > 0) {
+        while(length-- > 0) {
             sb.append('a');
         }
 
-        Integer selection = Integer.parseInt(args[0]);
-        switch(selection) {
-        case 0:
-            StringBuilder servers_sb = new StringBuilder();
-            for (int i = 4; i < args.length; i++) {
-                servers_sb.append(args[i] + " ");
+        Timer timeouter = new Timer();
+        if (cmd.hasOption("timeout")) {
+            final long timeout = Long.valueOf(cmd.getOptionValue("timeout", "360")) * 1000;
+
+            timeouter.schedule(new TimerTask() {
+                    public void run() {
+                        System.err.println("Timing out benchmark after " + timeout + "ms");
+                        System.exit(-1);
+                    }
+                }, timeout);
+        }
+
+        BookKeeper bkc = null;
+        try {
+            int numFiles = Integer.valueOf(cmd.getOptionValue("numconcurrent", "1"));
+            int numThreads = Math.min(numFiles, 1000);
+            byte[] data = sb.toString().getBytes();
+            long runid = System.currentTimeMillis();
+            List<Callable<Long>> clients = new ArrayList<Callable<Long>>();
+
+            if (target.equals("bk")) {
+                String zkservers = cmd.getOptionValue("zkservers", "localhost:2181");
+                int bkensemble = Integer.valueOf(cmd.getOptionValue("bkensemble", "3"));
+                int bkquorum = Integer.valueOf(cmd.getOptionValue("bkquorum", "2"));
+                int bkthrottle = Integer.valueOf(cmd.getOptionValue("bkthrottle", "10000"));
+
+                ClientConfiguration conf = new ClientConfiguration();
+                conf.setThrottleValue(bkthrottle);
+                conf.setZkServers(zkservers);
+
+                bkc = new BookKeeper(conf);
+                List<LedgerHandle> handles = new ArrayList<LedgerHandle>();
+                for (int i = 0; i < numFiles; i++) {
+                    handles.add(bkc.createLedger(bkensemble, bkquorum, DigestType.CRC32, new byte[] {'a', 'b'}));
+                }
+                for (int i = 0; i < numFiles; i++) {
+                    clients.add(new BKClient(handles, data, runfor, cmd.hasOption("sync")));
+                }
+            } else if (target.equals("hdfs")) {
+                FileSystem fs = FileSystem.get(new Configuration());
+                LOG.info("Default replication for HDFS: {}", fs.getDefaultReplication());
+
+                List<FSDataOutputStream> streams = new ArrayList<FSDataOutputStream>();
+                for (int i = 0; i < numFiles; i++) {
+                    String path = cmd.getOptionValue("path", "/foobar");
+                    streams.add(fs.create(new Path(path + runid + "_" + i)));
+                }
+
+                for (int i = 0; i < numThreads; i++) {
+                    clients.add(new HDFSClient(streams, data, runfor));
+                }
+            } else if (target.equals("fs")) {
+                List<FileOutputStream> streams = new ArrayList<FileOutputStream>();
+                for (int i = 0; i < numFiles; i++) {
+                    String path = cmd.getOptionValue("path", "/foobar " + i);
+                    streams.add(new FileOutputStream(path + runid + "_" + i));
+                }
+
+                for (int i = 0; i < numThreads; i++) {
+                    clients.add(new FileClient(streams, data, runfor));
+                }
+            } else {
+                LOG.error("Unknown option: " + target);
+                throw new IllegalArgumentException("Unknown target " + target);
             }
 
-            String servers = servers_sb.toString().trim().replace(' ', ',');
-            try {
-                TestClient c = new TestClient(servers, Integer.parseInt(args[3]), Integer.parseInt(args[4]));
-                c.writeSameEntryBatch(sb.toString().getBytes(), Integer.parseInt(args[2]));
-                //c.writeConsecutiveEntriesBatch(Integer.parseInt(args[0]));
-                c.closeHandle();
-            } catch (Exception e) {
-                LOG.error("Exception occurred", e);
-            } 
-            break;
-        case 1:
+            ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+            long start = System.currentTimeMillis();
 
-            try {
-                TestClient c = new TestClient(new FileOutputStream(args[2]));
-                c.writeSameEntryBatchFS(sb.toString().getBytes(), Integer.parseInt(args[3]));
-            } catch(FileNotFoundException e) {
-                LOG.error("File not found", e);
-            }
-            break;
-        case 2:
-            break;
+            List<Future<Long>> results = executor.invokeAll(clients,
+                                                            10, TimeUnit.MINUTES);
+            long end = System.currentTimeMillis();
+            long count = 0;
+            for (Future<Long> r : results) {
+                if (!r.isDone()) {
+                    LOG.warn("Job didn't complete");
+                    System.exit(2);
+                }
+                long c = r.get();
+                if (c == 0) {
+                    LOG.warn("Task didn't complete");
+                }
+                count += c;
+            }
+            long time = end-start;
+            LOG.info("Finished processing writes (ms): {} TPT: {} op/s",
+                     time, count/((double)time/1000));
+            executor.shutdown();
+        } catch (ExecutionException ee) {
+            LOG.error("Exception in worker", ee);
+        }  catch (KeeperException ke) {
+            LOG.error("Error accessing zookeeper", ke);
+        } catch (BKException e) {
+            LOG.error("Error accessing bookkeeper", e);
+        } catch (IOException ioe) {
+            LOG.error("I/O exception during benchmark", ioe);
+        } catch (InterruptedException ie) {
+            LOG.error("Benchmark interrupted", ie);
+        } finally {
+            if (bkc != null) {
+                try {
+                    bkc.close();
+                } catch (BKException bke) {
+                    LOG.error("Error closing bookkeeper client", bke);
+                } catch (InterruptedException ie) {
+                    LOG.warn("Interrupted closing bookkeeper client", ie);
+                }
+            }
         }
+        timeouter.cancel();
     }
 
-    void writeSameEntryBatch(byte[] data, int times) throws InterruptedException {
-        start = System.currentTimeMillis();
-        int count = times;
-        LOG.debug("Data: " + new String(data) + ", " + data.length);
-        while(count-- > 0) {
-            lh.asyncAddEntry(data, this, this.getFreshEntryId(2));
-        }
-        LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
-        synchronized (map) {
-            while (map.size() != 0) {
-                map.wait(100);
-            }
+    static class HDFSClient implements Callable<Long> {
+        final List<FSDataOutputStream> streams;
+        final byte[] data;
+        final long time;
+        final Random r;
+
+        HDFSClient(List<FSDataOutputStream> streams, byte[] data, long time) {
+            this.streams = streams;
+            this.data = data;
+            this.time = time;
+            this.r = new Random(System.identityHashCode(this));
         }
-        LOG.debug("Finished processing in ms: " + (System.currentTimeMillis() - start));
 
-        LOG.debug("Ended computation");
+        public Long call() {
+            try {
+                long count = 0;
+                long start = System.currentTimeMillis();
+                long stopat = start + time;
+                while(System.currentTimeMillis() < stopat) {
+                    FSDataOutputStream stream = streams.get(r.nextInt(streams.size()));
+                    synchronized(stream) {
+                        stream.write(data);
+                        stream.flush();
+                        stream.hflush();
+                    }
+                    count++;
+                }
+
+                long time = (System.currentTimeMillis() - start);
+                LOG.info("Worker finished processing writes (ms): {} TPT: {} op/s",
+                         time, count/((double)time/1000));
+                return count;
+            } catch(IOException ioe) {
+                LOG.error("Exception in worker thread", ioe);
+                return 0L;
+            }
+        }
     }
 
-    void writeConsecutiveEntriesBatch(int times) throws InterruptedException {
-        start = System.currentTimeMillis();
-        int count = times;
-        while(count-- > 0) {
-            byte[] write = new byte[2];
-            int j = count%100;
-            int k = (count+1)%100;
-            write[0] = (byte) j;
-            write[1] = (byte) k;
-            lh.asyncAddEntry(write, this, this.getFreshEntryId(2));
-        }
-        LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
-        synchronized (map) {
-            while (map.size() != 0) {
-                map.wait(100);
-            }
-        }
-        LOG.debug("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
-
-        Object syncObj = new Object();
-        synchronized(syncObj) {
-            lh.asyncReadEntries(1, times - 1, this, syncObj);
-            syncObj.wait();
+    static class FileClient implements Callable<Long> {
+        final List<FileOutputStream> streams;
+        final byte[] data;
+        final long time;
+        final Random r;
+
+        FileClient(List<FileOutputStream> streams, byte[] data, long time) {
+            this.streams = streams;
+            this.data = data;
+            this.time = time;
+            this.r = new Random(System.identityHashCode(this));
         }
-        LOG.error("Ended computation");
-    }
 
-    void writeSameEntryBatchFS(byte[] data, int times) {
-        int count = times;
-        LOG.debug("Data: " + data.length + ", " + times);
-        try {
-            start = System.currentTimeMillis();
-            while(count-- > 0) {
-                fStream.write(data);
-                fStreamLocal.write(data);
-                fStream.flush();
-            }
-            fStream.close();
-            System.out.println("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
-        } catch(IOException e) {
-            LOG.error("IOException occurred", e);
+        public Long call() {
+            try {
+                long count = 0;
+                long start = System.currentTimeMillis();
+
+                long stopat = start + time;
+                while(System.currentTimeMillis() < stopat) {
+                    FileOutputStream stream = streams.get(r.nextInt(streams.size()));
+                    synchronized(stream) {
+                        stream.write(data);
+                        stream.flush();
+                        stream.getChannel().force(false);
+                    }
+                    count++;
+                }
+
+                long time = (System.currentTimeMillis() - start);
+                LOG.info("Worker finished processing writes (ms): {} TPT: {} op/s", time, count/((double)time/1000));
+                return count;
+            } catch(IOException ioe) {
+                LOG.error("Exception in worker thread", ioe);
+                return 0L;
+            }
         }
     }
 
+    static class BKClient implements Callable<Long>, AddCallback {
+        final List<LedgerHandle> handles;
+        final byte[] data;
+        final long time;
+        final Random r;
+        final boolean sync;
+        final AtomicLong success = new AtomicLong(0);
+        final AtomicLong outstanding = new AtomicLong(0);
+
+        BKClient(List<LedgerHandle> handles, byte[] data, long time, boolean sync) {
+            this.handles = handles;
+            this.data = data;
+            this.time = time;
+            this.r = new Random(System.identityHashCode(this));
+            this.sync = sync;
+        }
 
-    @Override
-    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
-        this.removeEntryId((Integer) ctx);
-    }
+        public Long call() {
+            try {
+                long start = System.currentTimeMillis();
 
-    @Override
-    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
-        System.out.println("Read callback: " + rc);
-        while(seq.hasMoreElements()) {
-            LedgerEntry le = seq.nextElement();
-            LOG.debug(new String(le.getEntry()));
+                long stopat = start + time;
+                while(System.currentTimeMillis() < stopat) {
+                    LedgerHandle lh = handles.get(r.nextInt(handles.size()));
+                    if (sync) {
+                        lh.addEntry(data);
+                        success.incrementAndGet();
+                    } else {
+                        lh.asyncAddEntry(data, this, null);
+                        outstanding.incrementAndGet();
+                    }
+                }
+
+                int ticks = 10; // don't wait for more than 10 seconds
+                while (outstanding.get() > 0 && ticks-- > 0) {
+                    Thread.sleep(10);
+                }
+
+                long time = (System.currentTimeMillis() - start);
+                LOG.info("Worker finished processing writes (ms): {} TPT: {} op/s",
+                         time, success.get()/((double)time/1000));
+                return success.get();
+            } catch (BKException e) {
+                LOG.error("Exception in worker thread", e);
+                return 0L;
+            } catch (InterruptedException ie) {
+                LOG.error("Exception in worker thread", ie);
+                return 0L;
+            }
         }
-        synchronized(ctx) {
-            ctx.notify();
+
+        @Override
+        public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+            if (rc == BKException.Code.OK) {
+                success.incrementAndGet();
+            }
+            outstanding.decrementAndGet();
         }
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java Wed May  9 11:20:54 2012
@@ -115,7 +115,10 @@ public class TestBenchmark {
     @Test
     public void testThroughputLatency() throws Exception {
         BenchThroughputLatency.main(new String[] {
-                "--time", "10"
+                "--time", "10",
+                "--skipwarmup",
+                "--throttle", "1",
+                "--sendlimit", "10000"
             });
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java Wed May  9 11:20:54 2012
@@ -27,11 +27,20 @@ import org.jboss.netty.logging.InternalL
 import org.jboss.netty.logging.Log4JLoggerFactory;
 
 import com.google.protobuf.ByteString;
+import org.apache.hedwig.util.HedwigSocketAddress;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.HedwigClient;
 import org.apache.hedwig.client.api.Publisher;
 import org.apache.hedwig.client.api.Subscriber;
 
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
 public class HedwigBenchmark implements Callable<Void> {
     protected static final Logger logger = LoggerFactory.getLogger(HedwigBenchmark.class);
 
@@ -40,11 +49,13 @@ public class HedwigBenchmark implements 
     private final HedwigClient client;
     private final Publisher publisher;
     private final Subscriber subscriber;
+    private final CommandLine cmd;
 
-    public HedwigBenchmark(ClientConfiguration cfg) {
+    public HedwigBenchmark(ClientConfiguration cfg, CommandLine cmd) {
         client = new HedwigClient(cfg);
         publisher = client.getPublisher();
         subscriber = client.getSubscriber();
+        this.cmd = cmd;
     }
 
     static boolean amIResponsibleForTopic(int topicNum, int partitionIndex, int numPartitions) {
@@ -59,24 +70,24 @@ public class HedwigBenchmark implements 
         //
 
         // What program to run: pub, sub (subscription benchmark), recv.
-        final String mode = System.getProperty("mode","");
+        final String mode = cmd.getOptionValue("mode","");
 
         // Number of requests to make (publishes or subscribes).
-        int numTopics = Integer.getInteger("nTopics", 50);
-        int numMessages = Integer.getInteger("nMsgs", 1000);
-        int numRegions = Integer.getInteger("nRegions", 1);
-        int startTopicLabel = Integer.getInteger("startTopicLabel", 0);
-        int partitionIndex = Integer.getInteger("partitionIndex", 0);
-        int numPartitions = Integer.getInteger("nPartitions", 1);
-
-        int replicaIndex = Integer.getInteger("replicaIndex", 0);
-
-        int rate = Integer.getInteger("rate", 0);
-        int nParallel = Integer.getInteger("npar", 100);
-        int msgSize = Integer.getInteger("msgSize", 1024);
+        int numTopics = Integer.valueOf(cmd.getOptionValue("nTopics", "50"));
+        int numMessages = Integer.valueOf(cmd.getOptionValue("nMsgs", "1000"));
+        int numRegions = Integer.valueOf(cmd.getOptionValue("nRegions", "1"));
+        int startTopicLabel = Integer.valueOf(cmd.getOptionValue("startTopicLabel", "0"));
+        int partitionIndex = Integer.valueOf(cmd.getOptionValue("partitionIndex", "0"));
+        int numPartitions = Integer.valueOf(cmd.getOptionValue("nPartitions", "1"));
+
+        int replicaIndex = Integer.valueOf(cmd.getOptionValue("replicaIndex", "0"));
+
+        int rate = Integer.valueOf(cmd.getOptionValue("rate", "0"));
+        int nParallel = Integer.valueOf(cmd.getOptionValue("npar", "100"));
+        int msgSize = Integer.valueOf(cmd.getOptionValue("msgSize", "1024"));
 
         // Number of warmup subscriptions to make.
-        final int nWarmups = Integer.getInteger("nwarmups", 1000);
+        final int nWarmups = Integer.valueOf(cmd.getOptionValue("nwarmups", "1000"));
 
         if (mode.equals("sub")) {
             BenchmarkSubscriber benchmarkSub = new BenchmarkSubscriber(numTopics, 0, 1, startTopicLabel, 0, 1,
@@ -108,19 +119,45 @@ public class HedwigBenchmark implements 
     }
 
     public static void main(String[] args) throws Exception {
-        ClientConfiguration cfg = new ClientConfiguration();
-        if (args.length > 0) {
-            String confFile = args[0];
-            try {
-                cfg.loadConf(new File(confFile).toURI().toURL());
-            } catch (ConfigurationException e) {
-                throw new RuntimeException(e);
-            }
+        Options options = new Options();
+        options.addOption("mode", true, "sub, recv, or pub");
+        options.addOption("nTopics", true, "Number of topics, default 50");
+        options.addOption("nMsgs", true, "Number of messages, default 1000");
+        options.addOption("nRegions", true, "Number of regsions, default 1");
+        options.addOption("startTopicLabel", true,
+                          "Prefix of topic labels. Must be numeric. Default 0");
+        options.addOption("partitionIndex", true, "If partitioning, the partition index for this client");
+        options.addOption("nPartitions", true, "Number of partitions, default 1");
+        options.addOption("replicaIndex", true, "default 0");
+        options.addOption("rate", true, "default 0");
+        options.addOption("npar", true, "default 100");
+        options.addOption("msgSize", true, "Size of messages, default 1024");
+        options.addOption("nwarmups", true, "Number of warmup messages, default 1000");
+        options.addOption("defaultHub", true, "Default hedwig hub to connect to, default localhost:4080");
+
+        CommandLineParser parser = new PosixParser();
+        final CommandLine cmd = parser.parse(options, args);
+
+        if (cmd.hasOption("help")) {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("HedwigBenchmark <options>", options);
+            System.exit(-1);
         }
 
+        ClientConfiguration cfg = new ClientConfiguration() {
+                public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
+                    return new HedwigSocketAddress(cmd.getOptionValue("defaultHub",
+                                                                      "localhost:4080"));
+                }
+
+                public boolean isSSLEnabled() {
+                    return false;
+                }
+            };
+
         InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
 
-        HedwigBenchmark app = new HedwigBenchmark(cfg);
+        HedwigBenchmark app = new HedwigBenchmark(cfg, cmd);
         app.call();
         System.exit(0);
     }