You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/05/09 13:20:54 UTC
svn commit: r1336100 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-benchmark/
bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/
bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/
hedwig-client/src/main/java/org/apac...
Author: fpj
Date: Wed May 9 11:20:54 2012
New Revision: 1336100
URL: http://svn.apache.org/viewvc?rev=1336100&view=rev
Log:
BOOKKEEPER-236: Benchmarking improvements from latest round of benchmarking (ivank via fpj)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed May 9 11:20:54 2012
@@ -173,6 +173,8 @@ Trunk (unreleased changes)
bookkeeper-benchmark/
BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)
+ BOOKKEEPER-236: Benchmarking improvements from latest round of benchmarking (ivank via fpj)
+
Release 4.0.0 - 2011-11-30
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/pom.xml Wed May 9 11:20:54 2012
@@ -50,6 +50,18 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>0.23.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>0.23.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.0</version>
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java Wed May 9 11:20:54 2012
@@ -122,7 +122,8 @@ public class BenchBookie {
Options options = new Options();
options.addOption("host", true, "Hostname or IP of bookie to benchmark");
options.addOption("port", true, "Port of bookie to benchmark (default 3181)");
- options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
+ options.addOption("zookeeper", true, "Zookeeper ensemble, (default \"localhost:2181\")");
+ options.addOption("size", true, "Size of message to send, in bytes (default 1024)");
options.addOption("help", false, "This message");
CommandLineParser parser = new PosixParser();
@@ -136,6 +137,7 @@ public class BenchBookie {
String addr = cmd.getOptionValue("host");
int port = Integer.valueOf(cmd.getOptionValue("port", "3181"));
+ int size = Integer.valueOf(cmd.getOptionValue("size", "1024"));
String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
@@ -154,7 +156,7 @@ public class BenchBookie {
long ledger = getValidLedgerId(servers);
for(long entry = 0; entry < warmUpCount; entry++) {
- ChannelBuffer toSend = ChannelBuffers.buffer(128);
+ ChannelBuffer toSend = ChannelBuffers.buffer(size);
toSend.resetReaderIndex();
toSend.resetWriterIndex();
toSend.writeLong(ledger);
@@ -171,7 +173,7 @@ public class BenchBookie {
int entryCount = 5000;
long startTime = System.nanoTime();
for(long entry = 0; entry < entryCount; entry++) {
- ChannelBuffer toSend = ChannelBuffers.buffer(128);
+ ChannelBuffer toSend = ChannelBuffers.buffer(size);
toSend.resetReaderIndex();
toSend.resetWriterIndex();
toSend.writeLong(ledger);
@@ -192,7 +194,7 @@ public class BenchBookie {
startTime = System.currentTimeMillis();
tc = new ThroughputCallback();
for(long entry = 0; entry < entryCount; entry++) {
- ChannelBuffer toSend = ChannelBuffers.buffer(128);
+ ChannelBuffer toSend = ChannelBuffers.buffer(size);
toSend.resetReaderIndex();
toSend.resetWriterIndex();
toSend.writeLong(ledger);
@@ -204,6 +206,10 @@ public class BenchBookie {
tc.waitFor(entryCount);
endTime = System.currentTimeMillis();
LOG.info("Throughput: " + ((long)entryCount)*1000/(endTime-startTime));
+
+ bc.close();
+ channelFactory.releaseExternalResources();
+ executor.shutdown();
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java Wed May 9 11:20:54 2012
@@ -75,7 +75,7 @@ public class BenchReadThroughputLatency
}
};
- private static void readLedger(String zkservers, long ledgerId, byte[] passwd) {
+ private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] passwd) {
LOG.info("Reading ledger {}", ledgerId);
BookKeeper bk = null;
long time = 0;
@@ -83,13 +83,14 @@ public class BenchReadThroughputLatency
long lastRead = 0;
int nochange = 0;
+ long absoluteLimit = 5000000;
LedgerHandle lh = null;
try {
- bk = new BookKeeper(zkservers);
+ bk = new BookKeeper(conf);
while (true) {
lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32,
passwd);
- long lastConfirmed = lh.getLastAddConfirmed();
+ long lastConfirmed = Math.min(lh.getLastAddConfirmed(), absoluteLimit);
if (lastConfirmed == lastRead) {
nochange++;
if (nochange == 10) {
@@ -103,13 +104,17 @@ public class BenchReadThroughputLatency
}
long starttime = System.nanoTime();
- Enumeration<LedgerEntry> entries = lh.readEntries(lastRead+1, lastConfirmed);
- lastRead = lastConfirmed;
- while (entries.hasMoreElements()) {
- LedgerEntry e = entries.nextElement();
- entriesRead++;
- if ((entriesRead % 10000) == 0) {
- LOG.info("{} entries read", entriesRead);
+ while (lastRead < lastConfirmed) {
+ long nextLimit = lastRead + 100000;
+ long readTo = Math.min(nextLimit, lastConfirmed);
+ Enumeration<LedgerEntry> entries = lh.readEntries(lastRead+1, readTo);
+ lastRead = readTo;
+ while (entries.hasMoreElements()) {
+ LedgerEntry e = entries.nextElement();
+ entriesRead++;
+ if ((entriesRead % 10000) == 0) {
+ LOG.info("{} entries read", entriesRead);
+ }
}
}
long endtime = System.nanoTime();
@@ -151,6 +156,7 @@ public class BenchReadThroughputLatency
options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully");
options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')");
options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
+ options.addOption("sockettimeout", true, "Socket timeout for bookkeeper client. In seconds. Default 5");
options.addOption("help", false, "This message");
CommandLineParser parser = new PosixParser();
@@ -163,6 +169,7 @@ public class BenchReadThroughputLatency
final String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes();
+ final int sockTimeout = Integer.valueOf(cmd.getOptionValue("sockettimeout", "5"));
if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
LOG.error("Cannot used -ledger and -listen together");
usage(options);
@@ -185,6 +192,10 @@ public class BenchReadThroughputLatency
final CountDownLatch connectedLatch = new CountDownLatch(1);
final String nodepath = String.format("/ledgers/L%010d", ledger.get());
+ final ClientConfiguration conf = new ClientConfiguration();
+ conf.setReadTimeout(sockTimeout).setZkServers(servers);
+
+
final ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected
@@ -202,7 +213,7 @@ public class BenchReadThroughputLatency
connectedLatch.countDown();
} else if (event.getType() == Event.EventType.NodeCreated
&& event.getPath().equals(nodepath)) {
- readLedger(servers, ledger.get(), passwd);
+ readLedger(conf, ledger.get(), passwd);
shutdownLatch.countDown();
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
if (numLedgers.get() < 0) {
@@ -222,7 +233,7 @@ public class BenchReadThroughputLatency
int ledgersLeft = numLedgers.decrementAndGet();
Thread t = new Thread() {
public void run() {
- readLedger(servers, Long.valueOf(m.group(1)), passwd);
+ readLedger(conf, Long.valueOf(m.group(1)), passwd);
}
};
t.start();
@@ -243,7 +254,7 @@ public class BenchReadThroughputLatency
connectedLatch.await();
if (ledger.get() != 0) {
if (zk.exists(nodepath, true) != null) {
- readLedger(servers, ledger.get(), passwd);
+ readLedger(conf, ledger.get(), passwd);
shutdownLatch.countDown();
} else {
LOG.info("Watching for creation of" + nodepath);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java Wed May 9 11:20:54 2012
@@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -62,10 +64,9 @@ public class BenchThroughputLatency impl
AtomicLong counter;
Semaphore sem;
- int pace;
- int throttle;
int numberOfLedgers = 1;
- final String servers;
+ final int sendLimit;
+ final long latencies[];
static class Context {
long localStartTime;
@@ -78,24 +79,19 @@ public class BenchThroughputLatency impl
}
public BenchThroughputLatency(int ensemble, int qSize, byte[] passwd,
- int throttle, int numberOfLedgers, String servers)
+ int numberOfLedgers, int sendLimit, ClientConfiguration conf)
throws KeeperException, IOException, InterruptedException {
- this.sem = new Semaphore(throttle);
- this.throttle = throttle;
-
- ClientConfiguration conf = new ClientConfiguration();
- conf.setThrottleValue(100000);
- conf.setZkServers(servers);
- this.servers = servers;
-
+ this.sem = new Semaphore(conf.getThrottleValue());
bk = new BookKeeper(conf);
this.counter = new AtomicLong(0);
this.numberOfLedgers = numberOfLedgers;
+ this.sendLimit = sendLimit;
+ this.latencies = new long[sendLimit];
try{
lh = new LedgerHandle[this.numberOfLedgers];
for(int i = 0; i < this.numberOfLedgers; i++) {
- lh[i] = bk.createLedger(ensemble, qSize, BookKeeper.DigestType.CRC32,
+ lh[i] = bk.createLedger(ensemble, qSize, BookKeeper.DigestType.CRC32,
passwd);
LOG.info("Ledger Handle: " + lh[i].getId());
}
@@ -124,16 +120,9 @@ public class BenchThroughputLatency impl
return rand.nextInt(numberOfLedgers);
}
- int sendLimit = 2000000;
- long latencies[] = new long[sendLimit];
int latencyIndex = -1;
AtomicLong completedRequests = new AtomicLong(0);
- public void setSendLimit(int sendLimit) {
- this.sendLimit = sendLimit;
- latencies = new long[sendLimit];
- }
-
long duration = -1;
synchronized public long getDuration() {
return duration;
@@ -149,7 +138,7 @@ public class BenchThroughputLatency impl
public void run() {
try {
while(true) {
- Thread.sleep(200);
+ Thread.sleep(1000);
LOG.info("ms: {} req: {}", System.currentTimeMillis(), completedRequests.getAndSet(0));
}
} catch (InterruptedException ie) {
@@ -186,9 +175,12 @@ public class BenchThroughputLatency impl
}
LOG.info("Sent: " + sent);
try {
- synchronized (this) {
- while (this.counter.get() > 0) {
- waitFor(1000);
+ int i = 0;
+ while(this.counter.get() > 0) {
+ Thread.sleep(1000);
+ i++;
+ if (i > 30) {
+ break;
}
}
} catch(InterruptedException e) {
@@ -208,10 +200,6 @@ public class BenchThroughputLatency impl
LOG.info("Finished processing in ms: " + getDuration() + " tp = " + throughput);
}
- private void waitFor(int waitTime) throws InterruptedException {
- Thread.sleep(waitTime);
- }
-
long throughput = -1;
public long getThroughput() {
return throughput;
@@ -232,9 +220,10 @@ public class BenchThroughputLatency impl
sem.release();
counter.decrementAndGet();
- latencies[(int)entryId] = newTime;
-
- completedRequests.incrementAndGet();
+ if (rc == 0) {
+ latencies[(int)entryId] = newTime;
+ completedRequests.incrementAndGet();
+ }
}
public static void main(String[] args)
@@ -248,7 +237,11 @@ public class BenchThroughputLatency impl
options.addOption("ledgers", true, "Number of ledgers, default 1");
options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
options.addOption("password", true, "Password used to create ledgers (default 'benchPasswd')");
- options.addOption("coord_node", true, "Coordination znode for multi client benchmarks (optional)");
+ options.addOption("coordnode", true, "Coordination znode for multi client benchmarks (optional)");
+ options.addOption("timeout", true, "Number of seconds after which to give up");
+ options.addOption("sockettimeout", true, "Socket timeout for bookkeeper client. In seconds. Default 5");
+ options.addOption("skipwarmup", false, "Skip warm up, default false");
+ options.addOption("sendlimit", true, "Max number of entries to send. Default 20000000");
options.addOption("help", false, "This message");
CommandLineParser parser = new PosixParser();
@@ -268,10 +261,25 @@ public class BenchThroughputLatency impl
int ensemble = Integer.valueOf(cmd.getOptionValue("ensemble", "3"));
int quorum = Integer.valueOf(cmd.getOptionValue("quorum", "2"));
int throttle = Integer.valueOf(cmd.getOptionValue("throttle", "10000"));
+ int sendLimit = Integer.valueOf(cmd.getOptionValue("sendlimit", "20000000"));
+
+ final int sockTimeout = Integer.valueOf(cmd.getOptionValue("sockettimeout", "5"));
- String coordinationZnode = cmd.getOptionValue("coord_node");
+ String coordinationZnode = cmd.getOptionValue("coordnode");
final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes();
+ Timer timeouter = new Timer();
+ if (cmd.hasOption("timeout")) {
+ final long timeout = Long.valueOf(cmd.getOptionValue("timeout", "360")) * 1000;
+
+ timeouter.schedule(new TimerTask() {
+ public void run() {
+ System.err.println("Timing out benchmark after " + timeout + "ms");
+ System.exit(-1);
+ }
+ }, timeout);
+ }
+
LOG.warn("(Parameters received) running time: " + runningTime +
", entry size: " + entrysize + ", ensemble size: " + ensemble +
", quorum size: " + quorum +
@@ -284,19 +292,25 @@ public class BenchThroughputLatency impl
// Do a warmup run
Thread thread;
- long lastWarmUpTP = -1;
- long throughput;
- LOG.info("Starting warmup");
byte data[] = new byte[entrysize];
Arrays.fill(data, (byte)'x');
- if(lastWarmUpTP < (throughput = warmUp(servers, data, ledgers, ensemble, quorum, passwd, throttle))) {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setThrottleValue(throttle).setReadTimeout(sockTimeout).setZkServers(servers);
+
+ if (!cmd.hasOption("skipwarmup")) {
+ long throughput;
+ LOG.info("Starting warmup");
+
+ throughput = warmUp(data, ledgers, ensemble, quorum, passwd, conf);
LOG.info("Warmup tp: " + throughput);
+ LOG.info("Warmup phase finished");
}
- LOG.info("Warmup phase finished");
+
// Now do the benchmark
- BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, passwd, throttle, ledgers, servers);
+ BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, passwd,
+ ledgers, sendLimit, conf);
bench.setEntryData(data);
thread = new Thread(bench);
ZooKeeper zk = null;
@@ -336,18 +350,31 @@ public class BenchThroughputLatency impl
thread.join();
LOG.info("Calculating percentiles");
- ArrayList<Long> latency = new ArrayList<Long>();
+
+ int numlat = 0;
for(int i = 0; i < bench.latencies.length; i++) {
if (bench.latencies[i] > 0) {
- latency.add(bench.latencies[i]);
+ numlat++;
}
}
- double tp = (double)latency.size()*1000.0/(double)bench.getDuration();
- LOG.info(latency.size() + " completions in " + bench.getDuration() + " seconds: " + tp + " ops/sec");
+ int numcompletions = numlat;
+ numlat = Math.min(bench.sendLimit, numlat);
+ long[] latency = new long[numlat];
+ int j =0;
+ for(int i = 0; i < bench.latencies.length && j < numlat; i++) {
+ if (bench.latencies[i] > 0) {
+ latency[j++] = bench.latencies[i];
+ }
+ }
+ Arrays.sort(latency);
+
+ long tp = (long)((double)(numcompletions*1000.0)/(double)bench.getDuration());
+
+ LOG.info(numcompletions + " completions in " + bench.getDuration() + " seconds: " + tp + " ops/sec");
if (zk != null) {
- zk.create(coordinationZnode + "/worker-",
- ("tp " + tp + " duration " + bench.getDuration()).getBytes(),
+ zk.create(coordinationZnode + "/worker-",
+ ("tp " + tp + " duration " + bench.getDuration()).getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
zk.close();
}
@@ -362,32 +389,33 @@ public class BenchThroughputLatency impl
fos.close();
// now get the latencies
- Collections.sort(latency);
LOG.info("99th percentile latency: {}", percentile(latency, 99));
LOG.info("95th percentile latency: {}", percentile(latency, 95));
bench.close();
+ timeouter.cancel();
}
- private static double percentile(ArrayList<Long> latency, int percentile) {
- int size = latency.size();
+ private static double percentile(long[] latency, int percentile) {
+ int size = latency.length;
int sampleSize = (size * percentile) / 100;
long total = 0;
int count = 0;
for(int i = 0; i < sampleSize; i++) {
- total += latency.get(i);
+ total += latency[i];
count++;
}
return ((double)total/(double)count)/1000000.0;
}
- private static long warmUp(String servers, byte[] data,
- int ledgers, int ensemble, int qSize, byte[] passwd, int throttle)
+ private static long warmUp(byte[] data, int ledgers, int ensemble, int qSize,
+ byte[] passwd, ClientConfiguration conf)
throws KeeperException, IOException, InterruptedException, BKException {
final CountDownLatch connectLatch = new CountDownLatch(1);
final int bookies;
ZooKeeper zk = null;
try {
+ final String servers = conf.getZkServers();
zk = new ZooKeeper(servers, 15000, new Watcher() {
@Override
public void process(WatchedEvent event) {
@@ -407,10 +435,7 @@ public class BenchThroughputLatency impl
}
BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, passwd,
- throttle, ledgers, servers);
- int limit = 50000;
-
- warmup.setSendLimit(limit);
+ ledgers, 50000, conf);
warmup.setEntryData(data);
Thread thread = new Thread(warmup);
thread.start();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java Wed May 9 11:20:54 2012
@@ -23,12 +23,27 @@ package org.apache.bookkeeper.benchmark;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import java.util.concurrent.Future;;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -38,84 +53,29 @@ import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
+
/**
* This is a simple test program to compare the performance of writing to
* BookKeeper and to the local file system.
*
*/
-public class TestClient
- implements AddCallback, ReadCallback {
+public class TestClient {
private static final Logger LOG = LoggerFactory.getLogger(TestClient.class);
- BookKeeper x;
- LedgerHandle lh;
- Integer entryId;
- HashMap<Integer, Integer> map;
-
- FileOutputStream fStream;
- FileOutputStream fStreamLocal;
- long start, lastId;
-
- public TestClient() {
- entryId = 0;
- map = new HashMap<Integer, Integer>();
- }
-
- public TestClient(String servers) throws KeeperException, IOException, InterruptedException {
- this();
- x = new BookKeeper(servers);
- try {
- lh = x.createLedger(DigestType.MAC, new byte[] {'a', 'b'});
- } catch (BKException e) {
- LOG.error(e.toString());
- }
- }
-
- public TestClient(String servers, int ensSize, int qSize)
- throws KeeperException, IOException, InterruptedException {
- this();
- x = new BookKeeper(servers);
- try {
- lh = x.createLedger(ensSize, qSize, DigestType.MAC, new byte[] {'a', 'b'});
- } catch (BKException e) {
- LOG.error(e.toString());
- }
- }
-
- public TestClient(FileOutputStream fStream)
- throws FileNotFoundException {
- this.fStream = fStream;
- this.fStreamLocal = new FileOutputStream("./local.log");
- }
-
-
- public Integer getFreshEntryId(int val) {
- ++this.entryId;
- synchronized (map) {
- map.put(this.entryId, val);
- }
- return this.entryId;
- }
-
- public boolean removeEntryId(Integer id) {
- boolean retVal = false;
- synchronized (map) {
- map.remove(id);
- retVal = true;
-
- if(map.size() == 0) map.notifyAll();
- else {
- if(map.size() < 4)
- LOG.error(map.toString());
- }
- }
- return retVal;
- }
-
- public void closeHandle() throws KeeperException, InterruptedException, BKException {
- lh.close();
- }
/**
* First says if entries should be written to BookKeeper (0) or to the local
* disk (1). Second parameter is an integer defining the length of a ledger entry.
@@ -123,123 +83,286 @@ public class TestClient
*
* @param args
*/
- public static void main(String[] args) {
+ public static void main(String[] args) throws ParseException {
+ Options options = new Options();
+ options.addOption("length", true, "Length of packets being written. Default 1024");
+ options.addOption("target", true, "Target medium to write to. Options are bk, fs & hdfs. Default fs");
+ options.addOption("runfor", true, "Number of seconds to run for. Default 60");
+ options.addOption("path", true, "Path to write to. fs & hdfs only. Default /foobar");
+ options.addOption("zkservers", true, "ZooKeeper servers, comma separated. bk only. Default localhost:2181.");
+ options.addOption("bkensemble", true, "BookKeeper ledger ensemble size. bk only. Default 3");
+ options.addOption("bkquorum", true, "BookKeeper ledger quorum size. bk only. Default 2");
+ options.addOption("bkthrottle", true, "BookKeeper throttle size. bk only. Default 10000");
+ options.addOption("sync", false, "Use synchronous writes with BookKeeper. bk only.");
+ options.addOption("numconcurrent", true, "Number of concurrently clients. Default 1");
+ options.addOption("timeout", true, "Number of seconds after which to give up");
+ options.addOption("help", false, "This message");
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("help")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("TestClient <options>", options);
+ System.exit(-1);
+ }
+
+ int length = Integer.valueOf(cmd.getOptionValue("length", "1024"));
+ String target = cmd.getOptionValue("target", "fs");
+ long runfor = Long.valueOf(cmd.getOptionValue("runfor", "60")) * 1000;
- int lenght = Integer.parseInt(args[1]);
StringBuilder sb = new StringBuilder();
- while(lenght-- > 0) {
+ while(length-- > 0) {
sb.append('a');
}
- Integer selection = Integer.parseInt(args[0]);
- switch(selection) {
- case 0:
- StringBuilder servers_sb = new StringBuilder();
- for (int i = 4; i < args.length; i++) {
- servers_sb.append(args[i] + " ");
+ Timer timeouter = new Timer();
+ if (cmd.hasOption("timeout")) {
+ final long timeout = Long.valueOf(cmd.getOptionValue("timeout", "360")) * 1000;
+
+ timeouter.schedule(new TimerTask() {
+ public void run() {
+ System.err.println("Timing out benchmark after " + timeout + "ms");
+ System.exit(-1);
+ }
+ }, timeout);
+ }
+
+ BookKeeper bkc = null;
+ try {
+ int numFiles = Integer.valueOf(cmd.getOptionValue("numconcurrent", "1"));
+ int numThreads = Math.min(numFiles, 1000);
+ byte[] data = sb.toString().getBytes();
+ long runid = System.currentTimeMillis();
+ List<Callable<Long>> clients = new ArrayList<Callable<Long>>();
+
+ if (target.equals("bk")) {
+ String zkservers = cmd.getOptionValue("zkservers", "localhost:2181");
+ int bkensemble = Integer.valueOf(cmd.getOptionValue("bkensemble", "3"));
+ int bkquorum = Integer.valueOf(cmd.getOptionValue("bkquorum", "2"));
+ int bkthrottle = Integer.valueOf(cmd.getOptionValue("bkthrottle", "10000"));
+
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setThrottleValue(bkthrottle);
+ conf.setZkServers(zkservers);
+
+ bkc = new BookKeeper(conf);
+ List<LedgerHandle> handles = new ArrayList<LedgerHandle>();
+ for (int i = 0; i < numFiles; i++) {
+ handles.add(bkc.createLedger(bkensemble, bkquorum, DigestType.CRC32, new byte[] {'a', 'b'}));
+ }
+ for (int i = 0; i < numFiles; i++) {
+ clients.add(new BKClient(handles, data, runfor, cmd.hasOption("sync")));
+ }
+ } else if (target.equals("hdfs")) {
+ FileSystem fs = FileSystem.get(new Configuration());
+ LOG.info("Default replication for HDFS: {}", fs.getDefaultReplication());
+
+ List<FSDataOutputStream> streams = new ArrayList<FSDataOutputStream>();
+ for (int i = 0; i < numFiles; i++) {
+ String path = cmd.getOptionValue("path", "/foobar");
+ streams.add(fs.create(new Path(path + runid + "_" + i)));
+ }
+
+ for (int i = 0; i < numThreads; i++) {
+ clients.add(new HDFSClient(streams, data, runfor));
+ }
+ } else if (target.equals("fs")) {
+ List<FileOutputStream> streams = new ArrayList<FileOutputStream>();
+ for (int i = 0; i < numFiles; i++) {
+ String path = cmd.getOptionValue("path", "/foobar " + i);
+ streams.add(new FileOutputStream(path + runid + "_" + i));
+ }
+
+ for (int i = 0; i < numThreads; i++) {
+ clients.add(new FileClient(streams, data, runfor));
+ }
+ } else {
+ LOG.error("Unknown option: " + target);
+ throw new IllegalArgumentException("Unknown target " + target);
}
- String servers = servers_sb.toString().trim().replace(' ', ',');
- try {
- TestClient c = new TestClient(servers, Integer.parseInt(args[3]), Integer.parseInt(args[4]));
- c.writeSameEntryBatch(sb.toString().getBytes(), Integer.parseInt(args[2]));
- //c.writeConsecutiveEntriesBatch(Integer.parseInt(args[0]));
- c.closeHandle();
- } catch (Exception e) {
- LOG.error("Exception occurred", e);
- }
- break;
- case 1:
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ long start = System.currentTimeMillis();
- try {
- TestClient c = new TestClient(new FileOutputStream(args[2]));
- c.writeSameEntryBatchFS(sb.toString().getBytes(), Integer.parseInt(args[3]));
- } catch(FileNotFoundException e) {
- LOG.error("File not found", e);
- }
- break;
- case 2:
- break;
+ List<Future<Long>> results = executor.invokeAll(clients,
+ 10, TimeUnit.MINUTES);
+ long end = System.currentTimeMillis();
+ long count = 0;
+ for (Future<Long> r : results) {
+ if (!r.isDone()) {
+ LOG.warn("Job didn't complete");
+ System.exit(2);
+ }
+ long c = r.get();
+ if (c == 0) {
+ LOG.warn("Task didn't complete");
+ }
+ count += c;
+ }
+ long time = end-start;
+ LOG.info("Finished processing writes (ms): {} TPT: {} op/s",
+ time, count/((double)time/1000));
+ executor.shutdown();
+ } catch (ExecutionException ee) {
+ LOG.error("Exception in worker", ee);
+ } catch (KeeperException ke) {
+ LOG.error("Error accessing zookeeper", ke);
+ } catch (BKException e) {
+ LOG.error("Error accessing bookkeeper", e);
+ } catch (IOException ioe) {
+ LOG.error("I/O exception during benchmark", ioe);
+ } catch (InterruptedException ie) {
+ LOG.error("Benchmark interrupted", ie);
+ } finally {
+ if (bkc != null) {
+ try {
+ bkc.close();
+ } catch (BKException bke) {
+ LOG.error("Error closing bookkeeper client", bke);
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted closing bookkeeper client", ie);
+ }
+ }
}
+ timeouter.cancel();
}
- void writeSameEntryBatch(byte[] data, int times) throws InterruptedException {
- start = System.currentTimeMillis();
- int count = times;
- LOG.debug("Data: " + new String(data) + ", " + data.length);
- while(count-- > 0) {
- lh.asyncAddEntry(data, this, this.getFreshEntryId(2));
- }
- LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
- synchronized (map) {
- while (map.size() != 0) {
- map.wait(100);
- }
+ static class HDFSClient implements Callable<Long> {
+ final List<FSDataOutputStream> streams;
+ final byte[] data;
+ final long time;
+ final Random r;
+
+ HDFSClient(List<FSDataOutputStream> streams, byte[] data, long time) {
+ this.streams = streams;
+ this.data = data;
+ this.time = time;
+ this.r = new Random(System.identityHashCode(this));
}
- LOG.debug("Finished processing in ms: " + (System.currentTimeMillis() - start));
- LOG.debug("Ended computation");
+ public Long call() {
+ try {
+ long count = 0;
+ long start = System.currentTimeMillis();
+ long stopat = start + time;
+ while(System.currentTimeMillis() < stopat) {
+ FSDataOutputStream stream = streams.get(r.nextInt(streams.size()));
+ synchronized(stream) {
+ stream.write(data);
+ stream.flush();
+ stream.hflush();
+ }
+ count++;
+ }
+
+ long time = (System.currentTimeMillis() - start);
+ LOG.info("Worker finished processing writes (ms): {} TPT: {} op/s",
+ time, count/((double)time/1000));
+ return count;
+ } catch(IOException ioe) {
+ LOG.error("Exception in worker thread", ioe);
+ return 0L;
+ }
+ }
}
- void writeConsecutiveEntriesBatch(int times) throws InterruptedException {
- start = System.currentTimeMillis();
- int count = times;
- while(count-- > 0) {
- byte[] write = new byte[2];
- int j = count%100;
- int k = (count+1)%100;
- write[0] = (byte) j;
- write[1] = (byte) k;
- lh.asyncAddEntry(write, this, this.getFreshEntryId(2));
- }
- LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
- synchronized (map) {
- while (map.size() != 0) {
- map.wait(100);
- }
- }
- LOG.debug("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
-
- Object syncObj = new Object();
- synchronized(syncObj) {
- lh.asyncReadEntries(1, times - 1, this, syncObj);
- syncObj.wait();
+ static class FileClient implements Callable<Long> {
+ final List<FileOutputStream> streams;
+ final byte[] data;
+ final long time;
+ final Random r;
+
+ FileClient(List<FileOutputStream> streams, byte[] data, long time) {
+ this.streams = streams;
+ this.data = data;
+ this.time = time;
+ this.r = new Random(System.identityHashCode(this));
}
- LOG.error("Ended computation");
- }
- void writeSameEntryBatchFS(byte[] data, int times) {
- int count = times;
- LOG.debug("Data: " + data.length + ", " + times);
- try {
- start = System.currentTimeMillis();
- while(count-- > 0) {
- fStream.write(data);
- fStreamLocal.write(data);
- fStream.flush();
- }
- fStream.close();
- System.out.println("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
- } catch(IOException e) {
- LOG.error("IOException occurred", e);
+ public Long call() {
+ try {
+ long count = 0;
+ long start = System.currentTimeMillis();
+
+ long stopat = start + time;
+ while(System.currentTimeMillis() < stopat) {
+ FileOutputStream stream = streams.get(r.nextInt(streams.size()));
+ synchronized(stream) {
+ stream.write(data);
+ stream.flush();
+ stream.getChannel().force(false);
+ }
+ count++;
+ }
+
+ long time = (System.currentTimeMillis() - start);
+ LOG.info("Worker finished processing writes (ms): {} TPT: {} op/s", time, count/((double)time/1000));
+ return count;
+ } catch(IOException ioe) {
+ LOG.error("Exception in worker thread", ioe);
+ return 0L;
+ }
}
}
+ static class BKClient implements Callable<Long>, AddCallback {
+ final List<LedgerHandle> handles;
+ final byte[] data;
+ final long time;
+ final Random r;
+ final boolean sync;
+ final AtomicLong success = new AtomicLong(0);
+ final AtomicLong outstanding = new AtomicLong(0);
+
+ BKClient(List<LedgerHandle> handles, byte[] data, long time, boolean sync) {
+ this.handles = handles;
+ this.data = data;
+ this.time = time;
+ this.r = new Random(System.identityHashCode(this));
+ this.sync = sync;
+ }
- @Override
- public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
- this.removeEntryId((Integer) ctx);
- }
+ public Long call() {
+ try {
+ long start = System.currentTimeMillis();
- @Override
- public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
- System.out.println("Read callback: " + rc);
- while(seq.hasMoreElements()) {
- LedgerEntry le = seq.nextElement();
- LOG.debug(new String(le.getEntry()));
+ long stopat = start + time;
+ while(System.currentTimeMillis() < stopat) {
+ LedgerHandle lh = handles.get(r.nextInt(handles.size()));
+ if (sync) {
+ lh.addEntry(data);
+ success.incrementAndGet();
+ } else {
+ lh.asyncAddEntry(data, this, null);
+ outstanding.incrementAndGet();
+ }
+ }
+
+ int ticks = 10; // don't wait for more than 10 seconds
+ while (outstanding.get() > 0 && ticks-- > 0) {
+ Thread.sleep(10);
+ }
+
+ long time = (System.currentTimeMillis() - start);
+ LOG.info("Worker finished processing writes (ms): {} TPT: {} op/s",
+ time, success.get()/((double)time/1000));
+ return success.get();
+ } catch (BKException e) {
+ LOG.error("Exception in worker thread", e);
+ return 0L;
+ } catch (InterruptedException ie) {
+ LOG.error("Exception in worker thread", ie);
+ return 0L;
+ }
}
- synchronized(ctx) {
- ctx.notify();
+
+ @Override
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ if (rc == BKException.Code.OK) {
+ success.incrementAndGet();
+ }
+ outstanding.decrementAndGet();
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java Wed May 9 11:20:54 2012
@@ -115,7 +115,10 @@ public class TestBenchmark {
@Test
public void testThroughputLatency() throws Exception {
BenchThroughputLatency.main(new String[] {
- "--time", "10"
+ "--time", "10",
+ "--skipwarmup",
+ "--throttle", "1",
+ "--sendlimit", "10000"
});
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java?rev=1336100&r1=1336099&r2=1336100&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java Wed May 9 11:20:54 2012
@@ -27,11 +27,20 @@ import org.jboss.netty.logging.InternalL
import org.jboss.netty.logging.Log4JLoggerFactory;
import com.google.protobuf.ByteString;
+import org.apache.hedwig.util.HedwigSocketAddress;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.HedwigClient;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.api.Subscriber;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+
public class HedwigBenchmark implements Callable<Void> {
protected static final Logger logger = LoggerFactory.getLogger(HedwigBenchmark.class);
@@ -40,11 +49,13 @@ public class HedwigBenchmark implements
private final HedwigClient client;
private final Publisher publisher;
private final Subscriber subscriber;
+ private final CommandLine cmd;
- public HedwigBenchmark(ClientConfiguration cfg) {
+ public HedwigBenchmark(ClientConfiguration cfg, CommandLine cmd) {
client = new HedwigClient(cfg);
publisher = client.getPublisher();
subscriber = client.getSubscriber();
+ this.cmd = cmd;
}
static boolean amIResponsibleForTopic(int topicNum, int partitionIndex, int numPartitions) {
@@ -59,24 +70,24 @@ public class HedwigBenchmark implements
//
// What program to run: pub, sub (subscription benchmark), recv.
- final String mode = System.getProperty("mode","");
+ final String mode = cmd.getOptionValue("mode","");
// Number of requests to make (publishes or subscribes).
- int numTopics = Integer.getInteger("nTopics", 50);
- int numMessages = Integer.getInteger("nMsgs", 1000);
- int numRegions = Integer.getInteger("nRegions", 1);
- int startTopicLabel = Integer.getInteger("startTopicLabel", 0);
- int partitionIndex = Integer.getInteger("partitionIndex", 0);
- int numPartitions = Integer.getInteger("nPartitions", 1);
-
- int replicaIndex = Integer.getInteger("replicaIndex", 0);
-
- int rate = Integer.getInteger("rate", 0);
- int nParallel = Integer.getInteger("npar", 100);
- int msgSize = Integer.getInteger("msgSize", 1024);
+ int numTopics = Integer.valueOf(cmd.getOptionValue("nTopics", "50"));
+ int numMessages = Integer.valueOf(cmd.getOptionValue("nMsgs", "1000"));
+ int numRegions = Integer.valueOf(cmd.getOptionValue("nRegions", "1"));
+ int startTopicLabel = Integer.valueOf(cmd.getOptionValue("startTopicLabel", "0"));
+ int partitionIndex = Integer.valueOf(cmd.getOptionValue("partitionIndex", "0"));
+ int numPartitions = Integer.valueOf(cmd.getOptionValue("nPartitions", "1"));
+
+ int replicaIndex = Integer.valueOf(cmd.getOptionValue("replicaIndex", "0"));
+
+ int rate = Integer.valueOf(cmd.getOptionValue("rate", "0"));
+ int nParallel = Integer.valueOf(cmd.getOptionValue("npar", "100"));
+ int msgSize = Integer.valueOf(cmd.getOptionValue("msgSize", "1024"));
// Number of warmup subscriptions to make.
- final int nWarmups = Integer.getInteger("nwarmups", 1000);
+ final int nWarmups = Integer.valueOf(cmd.getOptionValue("nwarmups", "1000"));
if (mode.equals("sub")) {
BenchmarkSubscriber benchmarkSub = new BenchmarkSubscriber(numTopics, 0, 1, startTopicLabel, 0, 1,
@@ -108,19 +119,45 @@ public class HedwigBenchmark implements
}
public static void main(String[] args) throws Exception {
- ClientConfiguration cfg = new ClientConfiguration();
- if (args.length > 0) {
- String confFile = args[0];
- try {
- cfg.loadConf(new File(confFile).toURI().toURL());
- } catch (ConfigurationException e) {
- throw new RuntimeException(e);
- }
+ Options options = new Options();
+ options.addOption("mode", true, "sub, recv, or pub");
+ options.addOption("nTopics", true, "Number of topics, default 50");
+ options.addOption("nMsgs", true, "Number of messages, default 1000");
+ options.addOption("nRegions", true, "Number of regsions, default 1");
+ options.addOption("startTopicLabel", true,
+ "Prefix of topic labels. Must be numeric. Default 0");
+ options.addOption("partitionIndex", true, "If partitioning, the partition index for this client");
+ options.addOption("nPartitions", true, "Number of partitions, default 1");
+ options.addOption("replicaIndex", true, "default 0");
+ options.addOption("rate", true, "default 0");
+ options.addOption("npar", true, "default 100");
+ options.addOption("msgSize", true, "Size of messages, default 1024");
+ options.addOption("nwarmups", true, "Number of warmup messages, default 1000");
+ options.addOption("defaultHub", true, "Default hedwig hub to connect to, default localhost:4080");
+
+ CommandLineParser parser = new PosixParser();
+ final CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("help")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("HedwigBenchmark <options>", options);
+ System.exit(-1);
}
+ ClientConfiguration cfg = new ClientConfiguration() {
+ public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
+ return new HedwigSocketAddress(cmd.getOptionValue("defaultHub",
+ "localhost:4080"));
+ }
+
+ public boolean isSSLEnabled() {
+ return false;
+ }
+ };
+
InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
- HedwigBenchmark app = new HedwigBenchmark(cfg);
+ HedwigBenchmark app = new HedwigBenchmark(cfg, cmd);
app.call();
System.exit(0);
}