You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC
svn commit: r1165369 [9/9] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java Mon Sep 5 17:38:57 2011
@@ -50,20 +50,20 @@ public class ZkUtils {
// create mode is persistent since ephemeral nodes can't be
// parents
ZkUtils.createFullPathOptimistic(zk, PathUtils.parent(originalPath), new byte[0], acl,
- CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+ CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, String name) {
- if (rc == Code.OK.intValue() || rc == Code.NODEEXISTS.intValue()) {
- // succeeded in creating the parent, now
- // create the original path
- ZkUtils.createFullPathOptimistic(zk, originalPath, data, acl, createMode, callback,
- ctx);
- } else {
- callback.processResult(rc, path, ctx, name);
- }
- }
- }, ctx);
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, String name) {
+ if (rc == Code.OK.intValue() || rc == Code.NODEEXISTS.intValue()) {
+ // succeeded in creating the parent, now
+ // create the original path
+ ZkUtils.createFullPathOptimistic(zk, originalPath, data, acl, createMode, callback,
+ ctx);
+ } else {
+ callback.processResult(rc, path, ctx, name);
+ }
+ }
+ }, ctx);
}
}, ctx);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java Mon Sep 5 17:38:57 2011
@@ -29,7 +29,8 @@ public class HelperMethods {
public static List<Message> getRandomPublishedMessages(int numMessages, int size) {
ByteString[] regions = { ByteString.copyFromUtf8("sp1"), ByteString.copyFromUtf8("re1"),
- ByteString.copyFromUtf8("sg") };
+ ByteString.copyFromUtf8("sg")
+ };
return getRandomPublishedMessages(numMessages, size, regions);
}
@@ -39,7 +40,7 @@ public class HelperMethods {
byte[] body = new byte[size];
rand.nextBytes(body);
msgs.add(Message.newBuilder().setBody(ByteString.copyFrom(body)).setSrcRegion(
- regions[rand.nextInt(regions.length)]).build());
+ regions[rand.nextInt(regions.length)]).build());
}
return msgs;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControl.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControl.java Mon Sep 5 17:38:57 2011
@@ -37,197 +37,201 @@ public class ServerControl {
static Logger LOG = Logger.getLogger(ServerControl.class);
public class TestException extends Exception {
- public TestException(String str) {
- super(str);
- }
+ public TestException(String str) {
+ super(str);
+ }
};
public interface TestServer {
- public String getAddress();
- public void kill();
+ public String getAddress();
+ public void kill();
}
private class BookKeeperServer extends BookieServer implements TestServer {
- private String address;
+ private String address;
- public BookKeeperServer(int port, TestServer zkserver, String journal, String ledger) throws IOException {
- super(port, zkserver.getAddress(), new File(journal), new File[] { new File(ledger) });
-
- address = "localhost:"+port;
- start();
- }
-
- public String getAddress() {
- return address;
- }
-
- public void kill() {
- try {
- shutdown();
- } catch (Exception e) {
- }
- }
+ public BookKeeperServer(int port, TestServer zkserver, String journal, String ledger) throws IOException {
+ super(port, zkserver.getAddress(), new File(journal), new File[] { new File(ledger) });
+
+ address = "localhost:"+port;
+ start();
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void kill() {
+ try {
+ shutdown();
+ } catch (Exception e) {
+ }
+ }
}
private class ZookeeperServer extends ZooKeeperServerMain implements TestServer {
- public String address;
- public Thread serverThread;
- String path;
- public ZookeeperServer(int port, String path) throws TestException {
- super();
-
- this.path = path;
- final String[] args = { Integer.toString(port), path};
- address = "localhost:" + port;
- serverThread = new Thread() {
- public void run() {
- try {
- initializeAndRun(args);
- } catch (Exception e) {
- }
- };
- };
- serverThread.start();
- }
-
- public String getAddress() {
- return address;
- }
-
- public void kill() {
- shutdown();
- serverThread.interrupt();
- }
+ public String address;
+ public Thread serverThread;
+ String path;
+ public ZookeeperServer(int port, String path) throws TestException {
+ super();
+
+ this.path = path;
+ final String[] args = { Integer.toString(port), path};
+ address = "localhost:" + port;
+ serverThread = new Thread() {
+ public void run() {
+ try {
+ initializeAndRun(args);
+ } catch (Exception e) {
+ }
+ };
+ };
+ serverThread.start();
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void kill() {
+ shutdown();
+ serverThread.interrupt();
+ }
}
private class HedwigServer implements TestServer {
- private PubSubServer server;
- private String address;
+ private PubSubServer server;
+ private String address;
- public HedwigServer(int port, String region, TestServer zk) throws TestException {
- class MyServerConfiguration extends ServerConfiguration {
- MyServerConfiguration(int port, TestServer zk, String region) {
- conf.setProperty(ServerConfiguration.SERVER_PORT, port);
- conf.setProperty(ServerConfiguration.ZK_HOST, zk.getAddress());
- conf.setProperty(ServerConfiguration.REGION, region);
- }
- };
-
- address = "localhost:" + port;
-
- try {
- server = new PubSubServer(new MyServerConfiguration(port, zk, region));
- } catch (Exception e) {
- throw new TestException("Couldn't create pub sub server : " + e);
- }
- }
-
- public String getAddress() {
- return address;
- }
-
- public void kill() {
- server.shutdown();
- }
+ public HedwigServer(int port, String region, TestServer zk) throws TestException {
+ class MyServerConfiguration extends ServerConfiguration {
+ MyServerConfiguration(int port, TestServer zk, String region) {
+ conf.setProperty(ServerConfiguration.SERVER_PORT, port);
+ conf.setProperty(ServerConfiguration.ZK_HOST, zk.getAddress());
+ conf.setProperty(ServerConfiguration.REGION, region);
+ }
+ };
+
+ address = "localhost:" + port;
+
+ try {
+ server = new PubSubServer(new MyServerConfiguration(port, zk, region));
+ } catch (Exception e) {
+ throw new TestException("Couldn't create pub sub server : " + e);
+ }
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void kill() {
+ server.shutdown();
+ }
}
private String createTempDirectory(String suffix) throws IOException {
- String dir = System.getProperty("java.io.tmpdir") + File.separator + System.currentTimeMillis() + suffix;
- final File dirf = new File(dir);
- boolean good = dirf.mkdir();
- if (!good) {
- throw new IOException("Unable to create directory " + dir);
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void delete(File f) {
- File[] subfiles = f.listFiles();
- if (subfiles != null) {
- for (File subf : subfiles) {
- delete(subf);
- }
- }
- f.delete();
- }
-
- public void run() {
- delete(dirf);
- }
- });
- return dir;
+ String dir = System.getProperty("java.io.tmpdir") + File.separator + System.currentTimeMillis() + suffix;
+ final File dirf = new File(dir);
+ boolean good = dirf.mkdir();
+ if (!good) {
+ throw new IOException("Unable to create directory " + dir);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void delete(File f) {
+ File[] subfiles = f.listFiles();
+ if (subfiles != null) {
+ for (File subf : subfiles) {
+ delete(subf);
+ }
+ }
+ f.delete();
+ }
+
+ public void run() {
+ delete(dirf);
+ }
+ });
+ return dir;
}
public TestServer startZookeeperServer(int port) throws IOException, TestException {
- String dir = createTempDirectory("-zookeeper-" + port);
- ZookeeperServer server = new ZookeeperServer(port, dir);
-
- return server;
+ String dir = createTempDirectory("-zookeeper-" + port);
+ ZookeeperServer server = new ZookeeperServer(port, dir);
+
+ return server;
}
-
+
public TestServer startBookieServer(int port, TestServer zookeeperServer) throws IOException, TestException {
- int tries = 4;
- while (true) {
- try {
- tries--;
- ZooKeeper zk = new ZooKeeper(zookeeperServer.getAddress(), 1000, new Watcher() { public void process(WatchedEvent event) { /* do nothing */ } });
- if (zk.exists("/ledgers/available", false) == null) {
- byte[] data = new byte[1];
- data[0] = 0;
- zk.create("/ledgers", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/ledgers/available", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- zk.close();
- break;
- } catch (KeeperException.ConnectionLossException ce) {
- if (tries > 0) {
- try {
- Thread.sleep(3);
- } catch (Exception e) {
- throw new TestException("Can't even sleep. Fix your machine: " + e);
- }
- continue;
- } else {
- throw new TestException("Error connecting to zookeeper: " + ce);
- }
- } catch (Exception e) {
- throw new TestException("Error initialising bookkeeper ledgers: " + e);
- }
- }
- String journal = createTempDirectory("-bookie-" + port + "-journal");
- String ledger = createTempDirectory("-bookie-" + port + "-ledger");
- BookKeeperServer bookie = new BookKeeperServer(port, zookeeperServer, journal, ledger);
- return bookie;
+ int tries = 4;
+ while (true) {
+ try {
+ tries--;
+ ZooKeeper zk = new ZooKeeper(zookeeperServer.getAddress(), 1000, new Watcher() {
+ public void process(WatchedEvent event) {
+ /* do nothing */
+ }
+ });
+ if (zk.exists("/ledgers/available", false) == null) {
+ byte[] data = new byte[1];
+ data[0] = 0;
+ zk.create("/ledgers", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/ledgers/available", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ zk.close();
+ break;
+ } catch (KeeperException.ConnectionLossException ce) {
+ if (tries > 0) {
+ try {
+ Thread.sleep(3);
+ } catch (Exception e) {
+ throw new TestException("Can't even sleep. Fix your machine: " + e);
+ }
+ continue;
+ } else {
+ throw new TestException("Error connecting to zookeeper: " + ce);
+ }
+ } catch (Exception e) {
+ throw new TestException("Error initialising bookkeeper ledgers: " + e);
+ }
+ }
+ String journal = createTempDirectory("-bookie-" + port + "-journal");
+ String ledger = createTempDirectory("-bookie-" + port + "-ledger");
+ BookKeeperServer bookie = new BookKeeperServer(port, zookeeperServer, journal, ledger);
+ return bookie;
}
-
+
public TestServer startPubSubServer(int port, String region, TestServer zookeeperServer) throws IOException, TestException {
- return new HedwigServer(port, region, zookeeperServer);
- }
+ return new HedwigServer(port, region, zookeeperServer);
+ }
public ServerControl() {
}
public static void main(String[] args) throws Exception {
- ServerControl control = new ServerControl();
+ ServerControl control = new ServerControl();
- TestServer zk = control.startZookeeperServer(12345);
- TestServer bk1 = control.startBookieServer(12346, zk);
- TestServer bk2 = control.startBookieServer(12347, zk);
- TestServer bk3 = control.startBookieServer(12348, zk);
-
- TestServer hw1 = control.startPubSubServer(12349, "foobar", zk);
- TestServer hw2 = control.startPubSubServer(12350, "foobar", zk);
- TestServer hw3 = control.startPubSubServer(12351, "foobar", zk);
- TestServer hw4 = control.startPubSubServer(12352, "barfoo", zk);
- LOG.info("Started " + zk.getAddress());
- LOG.info("Sleeping for 10 seconds");
- Thread.sleep(10000);
- bk3.kill();
- bk2.kill();
- bk1.kill();
- zk.kill();
- hw1.kill();
- hw2.kill();
- hw3.kill();
- hw4.kill();
+ TestServer zk = control.startZookeeperServer(12345);
+ TestServer bk1 = control.startBookieServer(12346, zk);
+ TestServer bk2 = control.startBookieServer(12347, zk);
+ TestServer bk3 = control.startBookieServer(12348, zk);
+
+ TestServer hw1 = control.startPubSubServer(12349, "foobar", zk);
+ TestServer hw2 = control.startPubSubServer(12350, "foobar", zk);
+ TestServer hw3 = control.startPubSubServer(12351, "foobar", zk);
+ TestServer hw4 = control.startPubSubServer(12352, "barfoo", zk);
+ LOG.info("Started " + zk.getAddress());
+ LOG.info("Sleeping for 10 seconds");
+ Thread.sleep(10000);
+ bk3.kill();
+ bk2.kill();
+ bk1.kill();
+ zk.kill();
+ hw1.kill();
+ hw2.kill();
+ hw3.kill();
+ hw4.kill();
}
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControlDaemon.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControlDaemon.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControlDaemon.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControlDaemon.java Mon Sep 5 17:38:57 2011
@@ -28,7 +28,7 @@ import org.jboss.netty.channel.socket.ni
import org.apache.log4j.Logger;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -49,123 +49,123 @@ public class ServerControlDaemon {
Logger.getLogger(ServerControlDaemon.class);
@ChannelPipelineCoverage("all")
- public static class ServerControlDaemonHandler extends SimpleChannelUpstreamHandler{
- private ServerControl control;
- private HashMap<Channel, HashMap<String, ServerControl.TestServer>> serverMap;
-
- public ServerControlDaemonHandler() {
- serverMap = new HashMap<Channel, HashMap<String, ServerControl.TestServer>>();
- control = new ServerControl();
- }
-
- private void addServerForChannel(Channel c, ServerControl.TestServer t) {
- LOG.info("Created server " + t.getAddress());
- HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
- if (map == null) {
- map = new HashMap<String, ServerControl.TestServer>();
- serverMap.put(c, map);
- }
- map.put(t.getAddress(), t);
- }
-
- private void killServerForChannel(Channel c, String name) {
- LOG.info("Killing server " + name);
- HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
- ServerControl.TestServer t = map.get(name);
- map.remove(name);
- try {
- t.kill();
- } catch (Exception e) {
- LOG.error("Error killing server", e);
- // do nothing, should be killed, we won't use it again anyhow
- }
- }
-
- private ServerControl.TestServer lookupServer(Channel c, String name) {
- HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
- return map.get(name);
- }
-
- private void clearServersForChannel(Channel c) {
- HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
- serverMap.remove(map);
-
- if (map != null) {
- for (ServerControl.TestServer t : map.values()) {
- t.kill();
- }
- map.clear();
- }
- }
-
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- try {
- String command = (String)e.getMessage();
- LOG.info("Command: " + command);
- String[] args = command.split("\\s+");
-
- if (args[0].equals("START")) {
- ServerControl.TestServer t = null;
- if (args[1].equals("ZOOKEEPER")) {
- t = control.startZookeeperServer(Integer.valueOf(args[2]));
- addServerForChannel(ctx.getChannel(), t);
- } else if (args[1].equals("BOOKKEEPER")) {
- ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[3]);
- t = control.startBookieServer(Integer.valueOf(args[2]), zk);
- addServerForChannel(ctx.getChannel(), t);
- } else if (args[1].equals("HEDWIG")) {
- ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[4]);
- t = control.startPubSubServer(Integer.valueOf(args[2]), args[3], zk);
- addServerForChannel(ctx.getChannel(), t);
- }
-
- ctx.getChannel().write("OK " + t.getAddress() + "\n");
- } else if (args[0].equals("KILL")) {
- killServerForChannel(ctx.getChannel(), args[1]);
-
- ctx.getChannel().write("OK Killed " + args[1] + "\n");
- } else if (args[0].equals("TEST")) {
- LOG.info("\n******\n\n" + args[1] + "\n\n******");
- ctx.getChannel().write("OK Test Noted\n");
- } else {
- ctx.getChannel().write("ERR Bad Command\n");
- }
- } catch (Exception ex) {
- LOG.error("Error handling message", ex);
- ctx.getChannel().write("ERR " + ex.toString() + "\n");
- }
- }
-
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- clearServersForChannel(ctx.getChannel());
- }
+ public static class ServerControlDaemonHandler extends SimpleChannelUpstreamHandler {
+ private ServerControl control;
+ private HashMap<Channel, HashMap<String, ServerControl.TestServer>> serverMap;
+
+ public ServerControlDaemonHandler() {
+ serverMap = new HashMap<Channel, HashMap<String, ServerControl.TestServer>>();
+ control = new ServerControl();
+ }
+
+ private void addServerForChannel(Channel c, ServerControl.TestServer t) {
+ LOG.info("Created server " + t.getAddress());
+ HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+ if (map == null) {
+ map = new HashMap<String, ServerControl.TestServer>();
+ serverMap.put(c, map);
+ }
+ map.put(t.getAddress(), t);
+ }
+
+ private void killServerForChannel(Channel c, String name) {
+ LOG.info("Killing server " + name);
+ HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+ ServerControl.TestServer t = map.get(name);
+ map.remove(name);
+ try {
+ t.kill();
+ } catch (Exception e) {
+ LOG.error("Error killing server", e);
+ // do nothing, should be killed, we won't use it again anyhow
+ }
+ }
+
+ private ServerControl.TestServer lookupServer(Channel c, String name) {
+ HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+ return map.get(name);
+ }
+
+ private void clearServersForChannel(Channel c) {
+ HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+ serverMap.remove(map);
+
+ if (map != null) {
+ for (ServerControl.TestServer t : map.values()) {
+ t.kill();
+ }
+ map.clear();
+ }
+ }
+
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ try {
+ String command = (String)e.getMessage();
+ LOG.info("Command: " + command);
+ String[] args = command.split("\\s+");
+
+ if (args[0].equals("START")) {
+ ServerControl.TestServer t = null;
+ if (args[1].equals("ZOOKEEPER")) {
+ t = control.startZookeeperServer(Integer.valueOf(args[2]));
+ addServerForChannel(ctx.getChannel(), t);
+ } else if (args[1].equals("BOOKKEEPER")) {
+ ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[3]);
+ t = control.startBookieServer(Integer.valueOf(args[2]), zk);
+ addServerForChannel(ctx.getChannel(), t);
+ } else if (args[1].equals("HEDWIG")) {
+ ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[4]);
+ t = control.startPubSubServer(Integer.valueOf(args[2]), args[3], zk);
+ addServerForChannel(ctx.getChannel(), t);
+ }
+
+ ctx.getChannel().write("OK " + t.getAddress() + "\n");
+ } else if (args[0].equals("KILL")) {
+ killServerForChannel(ctx.getChannel(), args[1]);
+
+ ctx.getChannel().write("OK Killed " + args[1] + "\n");
+ } else if (args[0].equals("TEST")) {
+ LOG.info("\n******\n\n" + args[1] + "\n\n******");
+ ctx.getChannel().write("OK Test Noted\n");
+ } else {
+ ctx.getChannel().write("ERR Bad Command\n");
+ }
+ } catch (Exception ex) {
+ LOG.error("Error handling message", ex);
+ ctx.getChannel().write("ERR " + ex.toString() + "\n");
+ }
+ }
+
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ clearServersForChannel(ctx.getChannel());
+ }
}
public static void main(String[] args) throws Exception {
- // Configure the server.
- int port = 5672;
- if (args.length == 1) {
- port = Integer.valueOf(args[0]);
- }
- ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline p = Channels.pipeline();
- p.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
- p.addLast("stringDecoder", new StringDecoder("UTF-8"));
-
- // Encoder
- p.addLast("stringEncoder", new StringEncoder("UTF-8"));
- p.addLast("handler", new ServerControlDaemonHandler());
-
- return p;
- }
- });
-
- LOG.info("Listening on localhost:"+port);
- // Bind and start to accept incoming connections.
- bootstrap.bind(new InetSocketAddress(port));
+ // Configure the server.
+ int port = 5672;
+ if (args.length == 1) {
+ port = Integer.valueOf(args[0]);
+ }
+ ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
+ p.addLast("stringDecoder", new StringDecoder("UTF-8"));
+
+ // Encoder
+ p.addLast("stringEncoder", new StringEncoder("UTF-8"));
+ p.addLast("handler", new ServerControlDaemonHandler());
+
+ return p;
+ }
+ });
+
+ LOG.info("Listening on localhost:"+port);
+ // Bind and start to accept incoming connections.
+ bootstrap.bind(new InetSocketAddress(port));
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Mon Sep 5 17:38:57 2011
@@ -78,7 +78,7 @@ public class TestPubSubClient extends Pu
// Test implementation of subscriber's message handler.
class TestMessageHandler implements MessageHandler {
public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
- Object context) {
+ Object context) {
new Thread(new Runnable() {
@Override
public void run() {
@@ -112,7 +112,7 @@ public class TestPubSubClient extends Pu
boolean publishSuccess = true;
try {
publisher.publish(ByteString.copyFromUtf8("mySyncTopic"), Message.newBuilder().setBody(
- ByteString.copyFromUtf8("Hello Sync World!")).build());
+ ByteString.copyFromUtf8("Hello Sync World!")).build());
} catch (Exception e) {
publishSuccess = false;
}
@@ -122,7 +122,7 @@ public class TestPubSubClient extends Pu
@Test
public void testAsyncPublish() throws Exception {
publisher.asyncPublish(ByteString.copyFromUtf8("myAsyncTopic"), Message.newBuilder().setBody(
- ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null);
+ ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null);
assertTrue(queue.take());
}
@@ -132,13 +132,13 @@ public class TestPubSubClient extends Pu
ByteString topic2 = ByteString.copyFromUtf8("myNewTopic");
publisher.asyncPublish(topic1, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello World!")).build(),
- new TestCallback(), null);
+ new TestCallback(), null);
assertTrue(queue.take());
publisher.asyncPublish(topic2, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello on new topic!"))
- .build(), new TestCallback(), null);
+ .build(), new TestCallback(), null);
assertTrue(queue.take());
publisher.asyncPublish(topic1, Message.newBuilder().setBody(
- ByteString.copyFromUtf8("Hello Again on old topic!")).build(), new TestCallback(), null);
+ ByteString.copyFromUtf8("Hello Again on old topic!")).build(), new TestCallback(), null);
assertTrue(queue.take());
}
@@ -156,7 +156,7 @@ public class TestPubSubClient extends Pu
@Test
public void testAsyncSubscribe() throws Exception {
subscriber.asyncSubscribe(ByteString.copyFromUtf8("myAsyncSubscribeTopic"), ByteString.copyFromUtf8("1"),
- CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+ CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
assertTrue(queue.take());
}
@@ -173,23 +173,23 @@ public class TestPubSubClient extends Pu
// Now publish some messages for the topic to be consumed by the
// subscriber.
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
- new TestCallback(), null);
+ new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #2")).build(),
- new TestCallback(), null);
+ new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #3")).build(),
- new TestCallback(), null);
+ new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #4")).build(),
- new TestCallback(), null);
+ new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #5")).build(),
- new TestCallback(), null);
+ new TestCallback(), null);
assertTrue(queue.take());
assertTrue(consumeQueue.take());
}
@@ -227,4 +227,4 @@ public class TestPubSubClient extends Pu
assertTrue(true);
}
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java Mon Sep 5 17:38:57 2011
@@ -33,7 +33,7 @@ import org.apache.hedwig.server.persiste
/**
* This is a base class for any tests that need a Hedwig Hub(s) setup with an
* associated BookKeeper and ZooKeeper instance.
- *
+ *
*/
public abstract class HedwigHubTestBase extends TestCase {
@@ -79,7 +79,7 @@ public abstract class HedwigHubTestBase
public String getZkHost() {
return bktb.getZkHostPort();
}
-
+
@Override
public boolean isSSLEnabled() {
return true;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java Mon Sep 5 17:38:57 2011
@@ -39,7 +39,7 @@ import org.apache.hedwig.util.HedwigSock
* This is a base class for any tests that need a Hedwig Region(s) setup with a
* number of Hedwig hubs per region, an associated HedwigClient per region and
* the required BookKeeper and ZooKeeper instances.
- *
+ *
*/
public abstract class HedwigRegionTestBase extends TestCase {
@@ -173,7 +173,7 @@ public abstract class HedwigRegionTestBa
// Create the Hedwig PubSubServer Hubs for all of the regions
regionServersMap = new HashMap<String, List<PubSubServer>>(numRegions, 1.0f);
- regionClientsMap = new HashMap<String, HedwigClient>(numRegions, 1.0f);
+ regionClientsMap = new HashMap<String, HedwigClient>(numRegions, 1.0f);
for (int i = 0; i < numRegions; i++) {
List<PubSubServer> serversList = new LinkedList<PubSubServer>();
// For the current region, create the necessary amount of hub
@@ -181,8 +181,8 @@ public abstract class HedwigRegionTestBa
// starting from the initial ones defined.
for (int j = 0; j < numServersPerRegion; j++) {
serversList.add(new PubSubServer(getServerConfiguration(initialServerPort
- + (j + i * numServersPerRegion), initialSSLServerPort + (j + i * numServersPerRegion),
- REGION_PREFIX + i)));
+ + (j + i * numServersPerRegion), initialSSLServerPort + (j + i * numServersPerRegion),
+ REGION_PREFIX + i)));
}
// Store this list of servers created for the current region
regionServersMap.put(REGION_PREFIX + i, serversList);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java Mon Sep 5 17:38:57 2011
@@ -60,9 +60,9 @@ public class TestPubSubServerStartup {
}
private void instantiateAndDestroyPubSubServer() throws IOException, InterruptedException, ConfigurationException,
- MalformedURLException, Exception {
+ MalformedURLException, Exception {
String hedwigParams = "default_server_host=localhost:4080\n" + "zookeeper_connection_string=localhost:2181\n"
- + "zk_timeout=2000\n";
+ + "zk_timeout=2000\n";
File hedwigConfigFile = new File(System.getProperty("java.io.tmpdir") + "/hedwig.cfg");
writeStringToFile(hedwigParams, hedwigConfigFile);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Mon Sep 5 17:38:57 2011
@@ -36,7 +36,7 @@ public class StubDeliveryManager impleme
public boolean isHubSubscriber;
public StartServingRequest(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
+ DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
this.topic = topic;
this.subscriberId = subscriberId;
this.seqIdToStartFrom = seqIdToStartFrom;
@@ -51,10 +51,10 @@ public class StubDeliveryManager impleme
@Override
public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
+ DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
lastRequest.add(new StartServingRequest(topic, subscriberId, seqIdToStartFrom, endPoint, filter,
- isHubSubscriber));
+ isHubSubscriber));
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Mon Sep 5 17:38:57 2011
@@ -80,7 +80,7 @@ public class TestSubUnsubHandler extends
subRequestPrototype = SubscribeRequest.newBuilder().setSubscriberId(subscriberId).build();
pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType(
- OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
+ OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
ush = new UnsubscribeHandler(tm, conf, sm, dm);
}
@@ -88,9 +88,9 @@ public class TestSubUnsubHandler extends
@Test
public void testNoSubscribeRequest() {
sh.handleRequestAtOwner(PubSubRequest.newBuilder(pubSubRequestPrototype).clearSubscribeRequest().build(),
- channel);
+ channel);
assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
- .getStatusCode());
+ .getStatusCode());
}
@Test
@@ -118,11 +118,11 @@ public class TestSubUnsubHandler extends
// make sure subscription was registered
StubCallback<MessageSeqId> callback1 = new StubCallback<MessageSeqId>();
sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
- CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1,
- null);
+ CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1,
+ null);
assertEquals(PubSubException.ClientAlreadySubscribedException.class, ConcurrencyUtils.take(callback1.queue)
- .right().getClass());
+ .right().getClass());
// trying to subscribe again should throw an error
WriteRecordingChannel dupChannel = new WriteRecordingChannel();
@@ -140,10 +140,10 @@ public class TestSubUnsubHandler extends
channel = new WriteRecordingChannel();
ush.handleRequestAtOwner(pubSubRequestPrototype, channel);
assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
- .getStatusCode());
+ .getStatusCode());
PubSubRequest unsubRequest = PubSubRequest.newBuilder(pubSubRequestPrototype).setUnsubscribeRequest(
- UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build();
+ UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build();
channel = new WriteRecordingChannel();
dm.lastRequest.clear();
@@ -156,10 +156,10 @@ public class TestSubUnsubHandler extends
// make sure the info is gone from the sm
StubCallback<MessageSeqId> callback2 = new StubCallback<MessageSeqId>();
sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
- CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2,
- null);
+ CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2,
+ null);
assertEquals(PubSubException.ClientNotSubscribedException.class, ConcurrencyUtils.take(callback2.queue).right()
- .getClass());
+ .getClass());
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Mon Sep 5 17:38:57 2011
@@ -140,7 +140,7 @@ public class TestHedwigHub extends Hedwi
}
public void consume(ByteString topic, ByteString subscriberId, final Message msg, Callback<Void> callback,
- Object context) {
+ Object context) {
if (!consumedMessages.contains(msg.getMsgId())) {
// New message to consume. Add it to the Set of consumed
// messages.
@@ -154,7 +154,7 @@ public class TestHedwigHub extends Hedwi
public void run() {
if (logger.isDebugEnabled())
logger.debug("Consuming message that is out of order for msgId: "
- + msg.getMsgId().getLocalComponent());
+ + msg.getMsgId().getLocalComponent());
ConcurrencyUtils.put(consumeQueue, false);
}
}).start();
@@ -241,13 +241,13 @@ public class TestHedwigHub extends Hedwi
}
protected void startDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId,
- MessageHandler handler) throws Exception {
+ MessageHandler handler) throws Exception {
subscriber.startDelivery(topic, subscriberId, handler);
if (mode == Mode.PROXY) {
WriteRecordingChannel channel = new WriteRecordingChannel();
PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
- .setTopic(topic).setTxnId(0).setType(OperationType.START_DELIVERY).setStartDeliveryRequest(
- StartDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
+ .setTopic(topic).setTxnId(0).setType(OperationType.START_DELIVERY).setStartDeliveryRequest(
+ StartDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
proxy.getStartDeliveryHandler().handleRequest(request, channel);
assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
}
@@ -280,7 +280,7 @@ public class TestHedwigHub extends Hedwi
logger.debug("Subscribing to topics and starting delivery.");
for (int i = 0; i < batchSize; i++) {
subscriber.asyncSubscribe(getTopic(i), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
- new TestCallback(queue), null);
+ new TestCallback(queue), null);
assertTrue(queue.take());
}
@@ -327,7 +327,7 @@ public class TestHedwigHub extends Hedwi
ByteString myTopic = getTopic(0);
// Subscribe to a topic and start delivery on it
mySubscriber.asyncSubscribe(myTopic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
- new TestCallback(queue), null);
+ new TestCallback(queue), null);
assertTrue(queue.take());
startDelivery(mySubscriber, myTopic, localSubscriberId, new TestMessageHandler(consumeQueue));
// Publish some messages
@@ -342,7 +342,7 @@ public class TestHedwigHub extends Hedwi
boolean success = true;
try {
mySubscriber.consume(myTopic, localSubscriberId, MessageSeqId.newBuilder().setLocalComponent(i + 1)
- .build());
+ .build());
} catch (ClientNotSubscribedException e) {
success = false;
}
@@ -364,7 +364,7 @@ public class TestHedwigHub extends Hedwi
public void testAttachToSubscriptionSuccess() throws Exception {
ByteString topic = getTopic(0);
subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ null);
assertTrue(queue.take());
// Close the subscription asynchronously
subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
@@ -417,7 +417,7 @@ public class TestHedwigHub extends Hedwi
public void testUnsubscribe() throws Exception {
ByteString topic = getTopic(0);
subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -468,7 +468,7 @@ public class TestHedwigHub extends Hedwi
public void testCloseSubscription() throws Exception {
ByteString topic = getTopic(0);
subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -500,7 +500,7 @@ public class TestHedwigHub extends Hedwi
public void testStopDelivery() throws Exception {
ByteString topic = getTopic(0);
subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -541,7 +541,7 @@ public class TestHedwigHub extends Hedwi
public void testConsumedMessagesInOrder() throws Exception {
ByteString topic = getTopic(0);
subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
// Now publish some messages and verify that they are delivered in order
@@ -562,7 +562,7 @@ public class TestHedwigHub extends Hedwi
public void testCreateSubscriptionFailure() throws Exception {
ByteString topic = getTopic(0);
subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ null);
assertTrue(queue.take());
// Close the subscription asynchronously
subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
@@ -612,7 +612,7 @@ public class TestHedwigHub extends Hedwi
@Test
public void testAsyncSubscribeWithInvalidSubscriberId() throws Exception {
subscriber.asyncSubscribe(getTopic(0), hubSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
- new TestCallback(queue), null);
+ new TestCallback(queue), null);
assertFalse(queue.take());
}
@@ -659,7 +659,7 @@ public class TestHedwigHub extends Hedwi
HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(
- queue), null);
+ queue), null);
assertFalse(queue.take());
hubClient.stop();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java Mon Sep 5 17:38:57 2011
@@ -59,8 +59,8 @@ public class TestHedwigRegion extends He
for (HedwigClient client : regionClientsMap.values()) {
for (int i = 0; i < batchSize; i++) {
client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
- ByteString.copyFromUtf8("LocalSubscriber"), CreateOrAttach.CREATE_OR_ATTACH,
- new TestCallback(queue), null);
+ ByteString.copyFromUtf8("LocalSubscriber"), CreateOrAttach.CREATE_OR_ATTACH,
+ new TestCallback(queue), null);
assertTrue(queue.take());
}
}
@@ -69,7 +69,7 @@ public class TestHedwigRegion extends He
for (HedwigClient client : regionClientsMap.values()) {
for (int i = 0; i < batchSize; i++) {
client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i),
- ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue));
+ ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue));
}
}
@@ -79,7 +79,7 @@ public class TestHedwigRegion extends He
HedwigPublisher publisher = regionClientsMap.values().iterator().next().getPublisher();
for (int i = 0; i < batchSize; i++) {
publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), Message.newBuilder().setBody(
- ByteString.copyFromUtf8("Message" + i)).build(), new TestCallback(queue), null);
+ ByteString.copyFromUtf8("Message" + i)).build(), new TestCallback(queue), null);
assertTrue(queue.take());
}
// Make sure each region consumes the same set of published messages.
@@ -87,7 +87,7 @@ public class TestHedwigRegion extends He
for (int j = 0; j < batchSize; j++) {
assertTrue(consumeQueue.take());
}
- }
+ }
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java Mon Sep 5 17:38:57 2011
@@ -77,7 +77,7 @@ public class TestPubSubServer extends Pu
}
PubSubServer startServer(final UncaughtExceptionHandler uncaughtExceptionHandler, final int port,
- final TopicManagerInstantiator instantiator) throws Exception {
+ final TopicManagerInstantiator instantiator) throws Exception {
PubSubServer server = new PubSubServer(new StandAloneServerConfiguration() {
@Override
public int getServerPort() {
@@ -105,7 +105,7 @@ public class TestPubSubServer extends Pu
}).getPublisher();
publisher.asyncPublish(ByteString.copyFromUtf8("blah"), Message.newBuilder().setBody(
- ByteString.copyFromUtf8("blah")).build(), new Callback<Void>() {
+ ByteString.copyFromUtf8("blah")).build(), new Callback<Void>() {
@Override
public void operationFailed(Object ctx, PubSubException exception) {
assertTrue(false);
@@ -132,8 +132,8 @@ public class TestPubSubServer extends Pu
public TopicManager instantiateTopicManager() throws IOException {
return new AbstractTopicManager(new ServerConfiguration(), Executors.newSingleThreadScheduledExecutor()) {
@Override
- protected void realGetOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx) {
+ protected void realGetOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx) {
throw new RuntimeException("this exception should be uncaught");
}
@@ -164,8 +164,8 @@ public class TestPubSubServer extends Pu
return new AbstractTopicManager(new ServerConfiguration(), Executors.newSingleThreadScheduledExecutor()) {
@Override
- protected void realGetOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx) {
+ protected void realGetOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx) {
ZooKeeper zookeeper;
try {
zookeeper = new ZooKeeper(hostPort, 60000, new Watcher() {
@@ -182,7 +182,7 @@ public class TestPubSubServer extends Pu
zookeeper.getData("/fake", false, new SafeAsyncZKCallback.DataCallback() {
@Override
public void safeProcessResult(int rc, String path, Object ctx, byte[] data,
- org.apache.zookeeper.data.Stat stat) {
+ org.apache.zookeeper.data.Stat stat) {
throw new RuntimeException("This should go to the uncaught exception handler");
}
@@ -213,8 +213,8 @@ public class TestPubSubServer extends Pu
@Override
public List<String> getRegions() {
List<String> regionsList = new LinkedList<String>();
- regionsList.add("regionHost1:4080:9876");
- regionsList.add("regionHost2:4080");
+ regionsList.add("regionHost1:4080:9876");
+ regionsList.add("regionHost2:4080");
regionsList.add("regionHost3:4080:9876");
return regionsList;
}
@@ -227,7 +227,7 @@ public class TestPubSubServer extends Pu
success = true;
}
assertTrue(success);
- }
+ }
@Test
public void testValidServerConfiguration() throws Exception {
@@ -241,8 +241,8 @@ public class TestPubSubServer extends Pu
@Override
public List<String> getRegions() {
List<String> regionsList = new LinkedList<String>();
- regionsList.add("regionHost1:4080:9876");
- regionsList.add("regionHost2:4080:2938");
+ regionsList.add("regionHost1:4080:9876");
+ regionsList.add("regionHost2:4080:2938");
regionsList.add("regionHost3:4080:9876");
return regionsList;
}
@@ -255,6 +255,6 @@ public class TestPubSubServer extends Pu
success = false;
}
assertTrue(success);
- }
+ }
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java Mon Sep 5 17:38:57 2011
@@ -38,7 +38,7 @@ import org.apache.log4j.Logger;
/**
* This is a base class for any tests that require a BookKeeper client/server
* setup.
- *
+ *
*/
public class BookKeeperTestBase extends ZooKeeperTestBase {
private static Logger logger = Logger.getLogger(BookKeeperTestBase.class);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java Mon Sep 5 17:38:57 2011
@@ -76,7 +76,7 @@ public class StubPersistenceManager impl
}
request.getCallback().messageScanned(request.getCtx(),
- messages.get(request.getTopic()).get((int) request.getStartSeqId()));
+ messages.get(request.getTopic()).get((int) request.getStartSeqId()));
}
@@ -90,7 +90,7 @@ public class StubPersistenceManager impl
long startSeqId = request.getStartSeqId();
for (int i = 0; i < request.getMessageLimit(); i++) {
List<Message> messageList = MapMethods.getAfterInsertingIfAbsent(messages, request.getTopic(),
- ArrayListMessageFactory.instance);
+ ArrayListMessageFactory.instance);
if (startSeqId + i > messageList.size()) {
request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
return;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java Mon Sep 5 17:38:57 2011
@@ -24,25 +24,25 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.util.ConcurrencyUtils;
import org.apache.hedwig.util.Either;
-public class StubScanCallback implements ScanCallback{
+public class StubScanCallback implements ScanCallback {
public static Message END_MESSAGE = Message.newBuilder().setBody(ByteString.EMPTY).build();
-
+
LinkedBlockingQueue<Either<Message, Exception>> queue = new LinkedBlockingQueue<Either<Message,Exception>>();
-
+
@Override
public void messageScanned(Object ctx, Message message) {
- ConcurrencyUtils.put(queue, Either.of(message, (Exception) null));
+ ConcurrencyUtils.put(queue, Either.of(message, (Exception) null));
}
-
+
@Override
public void scanFailed(Object ctx, Exception exception) {
ConcurrencyUtils.put(queue, Either.of((Message) null, exception));
}
-
+
@Override
public void scanFinished(Object ctx, ReasonForFinish reason) {
ConcurrencyUtils.put(queue, Either.of(END_MESSAGE, (Exception) null));
-
+
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java Mon Sep 5 17:38:57 2011
@@ -31,8 +31,8 @@ import org.apache.hedwig.server.topics.T
public class TestBookKeeperPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
BookKeeperTestBase bktb;
- private final int numBookies = 3;
-
+ private final int numBookies = 3;
+
@Override
@Before
protected void setUp() throws Exception {
@@ -61,7 +61,7 @@ public class TestBookKeeperPersistenceMa
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
return new BookkeeperPersistenceManager(bktb.bk, bktb.getZooKeeperClient(), new TrivialOwnAllTopicManager(conf,
- scheduler), conf, scheduler);
+ scheduler), conf, scheduler);
}
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java Mon Sep 5 17:38:57 2011
@@ -78,7 +78,7 @@ public class TestBookkeeperPersistenceMa
// now abandon, and try another time, the prev ledger should be dirty
bkpm = new BookkeeperPersistenceManager(new BookKeeper(bktb.getZkHostPort()), bktb.getZooKeeperClient(), tm,
- conf, scheduler);
+ conf, scheduler);
bkpm.acquiredTopic(topic, stubCallback, null);
assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
@@ -92,7 +92,7 @@ public class TestBookkeeperPersistenceMa
int index = 0;
int numPrevLedgers = 0;
List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
- SIZE_OF_MESSAGES_TO_TEST);
+ SIZE_OF_MESSAGES_TO_TEST);
while (index < messages.size()) {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java Mon Sep 5 17:38:57 2011
@@ -163,13 +163,13 @@ public abstract class TestPersistenceMan
PersistenceManagerWithRangeScan rangePersistenceManager = (PersistenceManagerWithRangeScan) persistenceManager;
rangePersistenceManager.scanMessages(new RangeScanRequest(topic, getLowestSeqId(),
- NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue));
+ NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue));
} else {
ScanCallback listener = new PointScanVerifierListener(pubMsgs, topic);
persistenceManager
- .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue));
+ .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue));
}
// now listen for it to finish
@@ -295,7 +295,7 @@ public abstract class TestPersistenceMan
assertEquals(null, failureException);
for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
assertEquals(persistenceManager.getCurrentSeqIdForTopic(getTopicName(i)).getLocalComponent(),
- getExpectedSeqId(NUM_MESSAGES_TO_TEST));
+ getExpectedSeqId(NUM_MESSAGES_TO_TEST));
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java Mon Sep 5 17:38:57 2011
@@ -90,7 +90,7 @@ public class TestReadAheadCacheWhiteBox
}
@Test
- public void testPersistMessage() throws Exception{
+ public void testPersistMessage() throws Exception {
StubCallback<Long> callback = new StubCallback<Long>();
PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null);
@@ -99,20 +99,20 @@ public class TestReadAheadCacheWhiteBox
assertNotNull(ConcurrencyUtils.take(callback.queue).right());
CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
- .getLocalComponent());
+ .getLocalComponent());
assertFalse(cacheBasedPersistenceManager.cache.containsKey(key));
stubPersistenceManager.failure = false;
persistMessage(messages.get(0));
}
- private void persistMessage(Message msg) throws Exception{
+ private void persistMessage(Message msg) throws Exception {
StubCallback<Long> callback = new StubCallback<Long>();
PersistRequest request = new PersistRequest(topic, msg, callback, null);
cacheBasedPersistenceManager.persistMessage(request);
assertNotNull(ConcurrencyUtils.take(callback.queue).left());
CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
- .getLocalComponent());
+ .getLocalComponent());
CacheValue cacheValue = cacheBasedPersistenceManager.cache.get(key);
assertNotNull(cacheValue);
assertFalse(cacheValue.isStub());
@@ -140,7 +140,7 @@ public class TestReadAheadCacheWhiteBox
}
@Test
- public void testDeliveredUntil() throws Exception{
+ public void testDeliveredUntil() throws Exception {
for (Message m : messages) {
persistMessage(m);
}
@@ -184,7 +184,7 @@ public class TestReadAheadCacheWhiteBox
}
@Test
- public void testReadAheadSizeLimit() throws Exception{
+ public void testReadAheadSizeLimit() throws Exception {
for (Message m : messages) {
persistMessage(m);
}
@@ -195,17 +195,17 @@ public class TestReadAheadCacheWhiteBox
assertTrue(callback.isSuccess());
assertEquals((int) Math.ceil(myConf.readAheadSize / (MSG_SIZE + 0.0)), cacheBasedPersistenceManager.cache
- .size());
+ .size());
}
@Test
- public void testDoReadAheadStartingFrom() throws Exception{
+ public void testDoReadAheadStartingFrom() throws Exception {
persistMessage(messages.get(0));
int readAheadCount = 5;
int start = 1;
RangeScanRequest readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, start,
- readAheadCount);
+ readAheadCount);
assertNull(readAheadRequest);
StubScanCallback callback = new StubScanCallback();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java Mon Sep 5 17:38:57 2011
@@ -41,7 +41,7 @@ public class StubSubscriptionManager ext
@Override
public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
- Callback<MessageSeqId> callback, Object ctx) {
+ Callback<MessageSeqId> callback, Object ctx) {
if (fail) {
callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
return;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Mon Sep 5 17:38:57 2011
@@ -108,12 +108,12 @@ public class TestZkSubscriptionManager e
sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
- PubSubException.ServerNotResponsibleForTopicException.class);
+ PubSubException.ServerNotResponsibleForTopicException.class);
sm.unsubscribe(topic1, sub1, voidCallback, null);
Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
- PubSubException.ServerNotResponsibleForTopicException.class);
+ PubSubException.ServerNotResponsibleForTopicException.class);
//
// Acquire topic.
@@ -127,31 +127,31 @@ public class TestZkSubscriptionManager e
sm.unsubscribe(topic1, sub1, voidCallback, null);
Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
- PubSubException.ClientNotSubscribedException.class);
+ PubSubException.ClientNotSubscribedException.class);
//
// Try to attach to a subscription.
subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
- .build();
+ .build();
sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
- PubSubException.ClientNotSubscribedException.class);
+ PubSubException.ClientNotSubscribedException.class);
// now create
subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).setSubscriberId(sub1)
- .build();
+ .build();
sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
Assert.assertEquals(msgId.getLocalComponent(), ConcurrencyUtils.take(msgIdCallbackQueue).left().getLocalComponent());
Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
+ .getLocalComponent());
// try to create again
sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
- PubSubException.ClientAlreadySubscribedException.class);
+ PubSubException.ClientAlreadySubscribedException.class);
Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
+ .getLocalComponent());
sm.lostTopic(topic1);
sm.acquiredTopic(topic1, voidCallback, null);
@@ -159,27 +159,27 @@ public class TestZkSubscriptionManager e
// try to attach
subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
- .build();
+ .build();
MessageSeqId msgId1 = MessageSeqId.newBuilder().setLocalComponent(msgId.getLocalComponent() + 10).build();
sm.serveSubscribeRequest(topic1, subRequest, msgId1, msgIdCallback, null);
Assert.assertEquals(msgId.getLocalComponent(), msgIdCallbackQueue.take().left().getLocalComponent());
Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
+ .getLocalComponent());
// now manipulate the consume ptrs
// dont give it enough to have it persist to ZK
MessageSeqId msgId2 = MessageSeqId.newBuilder().setLocalComponent(
- msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build();
+ msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build();
sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId2, voidCallback, null);
Assert.assertTrue(BooleanCallbackQueue.take().left());
Assert.assertEquals(msgId2.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
+ .getLocalComponent());
Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
- .getLocalComponent());
+ .getLocalComponent());
// give it more so that it will write to ZK
MessageSeqId msgId3 = MessageSeqId.newBuilder().setLocalComponent(
- msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build();
+ msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build();
sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId3, voidCallback, null);
Assert.assertTrue(BooleanCallbackQueue.take().left());
@@ -188,9 +188,9 @@ public class TestZkSubscriptionManager e
Assert.assertTrue(BooleanCallbackQueue.take().left());
Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
+ .getLocalComponent());
Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
- .getLocalComponent());
+ .getLocalComponent());
// finally unsubscribe
sm.unsubscribe(topic1, sub1, voidCallback, null);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java Mon Sep 5 17:38:57 2011
@@ -44,8 +44,8 @@ public class StubTopicManager extends Tr
}
@Override
- protected void realGetOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx) {
+ protected void realGetOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx) {
if (shouldError) {
cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java Mon Sep 5 17:38:57 2011
@@ -126,7 +126,7 @@ public class TestZkTopicManager extends
@Test
public void testGetOwnerMulti() throws Exception {
ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1), cfg2 = new CustomServerConfiguration(
- cfg.getServerPort() + 2);
+ cfg.getServerPort() + 2);
// TODO change cfg1 cfg2 params
ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler), tm2 = new ZkTopicManager(zk, cfg2, scheduler);
@@ -265,7 +265,7 @@ public class TestZkTopicManager extends
Assert.assertEquals(topic, pair.first());
Assert.assertTrue(pair.second());
Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
- .getExceptions().iterator().next().getClass());
+ .getExceptions().iterator().next().getClass());
Assert.assertFalse(tm.topics.contains(topic));
Thread.sleep(100);
assertOwnershipNodeDoesntExist();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java Mon Sep 5 17:38:57 2011
@@ -32,7 +32,7 @@ import org.apache.hedwig.util.Callback;
/**
* This is a base class for any tests that need a ZooKeeper client/server setup.
- *
+ *
*/
public abstract class ZooKeeperTestBase extends ClientBase {