You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC

svn commit: r1165369 [9/9] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java Mon Sep  5 17:38:57 2011
@@ -50,20 +50,20 @@ public class ZkUtils {
                 // create mode is persistent since ephemeral nodes can't be
                 // parents
                 ZkUtils.createFullPathOptimistic(zk, PathUtils.parent(originalPath), new byte[0], acl,
-                        CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+                CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
 
-                            @Override
-                            public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                                if (rc == Code.OK.intValue() || rc == Code.NODEEXISTS.intValue()) {
-                                    // succeeded in creating the parent, now
-                                    // create the original path
-                                    ZkUtils.createFullPathOptimistic(zk, originalPath, data, acl, createMode, callback,
-                                            ctx);
-                                } else {
-                                    callback.processResult(rc, path, ctx, name);
-                                }
-                            }
-                        }, ctx);
+                    @Override
+                    public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                        if (rc == Code.OK.intValue() || rc == Code.NODEEXISTS.intValue()) {
+                            // succeeded in creating the parent, now
+                            // create the original path
+                            ZkUtils.createFullPathOptimistic(zk, originalPath, data, acl, createMode, callback,
+                                                             ctx);
+                        } else {
+                            callback.processResult(rc, path, ctx, name);
+                        }
+                    }
+                }, ctx);
             }
         }, ctx);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java Mon Sep  5 17:38:57 2011
@@ -29,7 +29,8 @@ public class HelperMethods {
 
     public static List<Message> getRandomPublishedMessages(int numMessages, int size) {
         ByteString[] regions = { ByteString.copyFromUtf8("sp1"), ByteString.copyFromUtf8("re1"),
-                ByteString.copyFromUtf8("sg") };
+                                 ByteString.copyFromUtf8("sg")
+                               };
         return getRandomPublishedMessages(numMessages, size, regions);
     }
 
@@ -39,7 +40,7 @@ public class HelperMethods {
             byte[] body = new byte[size];
             rand.nextBytes(body);
             msgs.add(Message.newBuilder().setBody(ByteString.copyFrom(body)).setSrcRegion(
-                    regions[rand.nextInt(regions.length)]).build());
+                         regions[rand.nextInt(regions.length)]).build());
         }
         return msgs;
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControl.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControl.java Mon Sep  5 17:38:57 2011
@@ -37,197 +37,201 @@ public class ServerControl {
     static Logger LOG = Logger.getLogger(ServerControl.class);
 
     public class TestException extends Exception {
-	public TestException(String str) {
-	    super(str);
-	}
+        public TestException(String str) {
+            super(str);
+        }
     };
 
     public interface TestServer {
-	public String getAddress();
-	public void kill();
+        public String getAddress();
+        public void kill();
     }
 
     private class BookKeeperServer extends BookieServer implements TestServer {
-	private String address;
+        private String address;
 
-	public BookKeeperServer(int port, TestServer zkserver, String journal, String ledger) throws IOException {
-	    super(port, zkserver.getAddress(), new File(journal), new File[] { new File(ledger) });
-	    
-	    address = "localhost:"+port;
-	    start();
-	} 
-
-	public String getAddress() {
-	    return address;
-	}
-	
-	public void kill() {
-	    try {
-		shutdown();
-	    } catch (Exception e) {
-	    }
-	}
+        public BookKeeperServer(int port, TestServer zkserver, String journal, String ledger) throws IOException {
+            super(port, zkserver.getAddress(), new File(journal), new File[] { new File(ledger) });
+
+            address = "localhost:"+port;
+            start();
+        }
+
+        public String getAddress() {
+            return address;
+        }
+
+        public void kill() {
+            try {
+                shutdown();
+            } catch (Exception e) {
+            }
+        }
     }
 
     private class ZookeeperServer extends ZooKeeperServerMain implements TestServer {
-	public String address;
-	public Thread serverThread;
-	String path;
-	public ZookeeperServer(int port, String path) throws TestException {
-	    super(); 
-
-	    this.path = path;
-	    final String[] args = { Integer.toString(port), path};
-	    address = "localhost:" + port;
-	    serverThread = new Thread() {
-		    public void run() {
-			try {
-			    initializeAndRun(args);
-			} catch (Exception e) {
-			}
-		    };
-		};
-	    serverThread.start();
-	}
-
-	public String getAddress() {
-	    return address;
-	}
-
-	public void kill() {
-	    shutdown();
-	    serverThread.interrupt();
-	}
+        public String address;
+        public Thread serverThread;
+        String path;
+        public ZookeeperServer(int port, String path) throws TestException {
+            super();
+
+            this.path = path;
+            final String[] args = { Integer.toString(port), path};
+            address = "localhost:" + port;
+            serverThread = new Thread() {
+                public void run() {
+                    try {
+                        initializeAndRun(args);
+                    } catch (Exception e) {
+                    }
+                };
+            };
+            serverThread.start();
+        }
+
+        public String getAddress() {
+            return address;
+        }
+
+        public void kill() {
+            shutdown();
+            serverThread.interrupt();
+        }
     }
 
     private class HedwigServer implements TestServer {
-	private PubSubServer server;
-	private String address;
+        private PubSubServer server;
+        private String address;
 
-	public HedwigServer(int port, String region, TestServer zk) throws TestException {
-	    class MyServerConfiguration extends ServerConfiguration {
-		MyServerConfiguration(int port, TestServer zk, String region) {
-		    conf.setProperty(ServerConfiguration.SERVER_PORT, port);
-		    conf.setProperty(ServerConfiguration.ZK_HOST, zk.getAddress());
-		    conf.setProperty(ServerConfiguration.REGION, region);
-		}
-	    };
-	    
-	    address = "localhost:" + port;
-	    
-	    try {
-		server = new PubSubServer(new MyServerConfiguration(port, zk, region));
-	    } catch (Exception e) {
-		throw new TestException("Couldn't create pub sub server : " + e);
-	    }
-	}
-
-	public String getAddress() {
-	    return address;
-	}
-
-	public void kill() {
-	    server.shutdown();
-	}
+        public HedwigServer(int port, String region, TestServer zk) throws TestException {
+            class MyServerConfiguration extends ServerConfiguration {
+                MyServerConfiguration(int port, TestServer zk, String region) {
+                    conf.setProperty(ServerConfiguration.SERVER_PORT, port);
+                    conf.setProperty(ServerConfiguration.ZK_HOST, zk.getAddress());
+                    conf.setProperty(ServerConfiguration.REGION, region);
+                }
+            };
+
+            address = "localhost:" + port;
+
+            try {
+                server = new PubSubServer(new MyServerConfiguration(port, zk, region));
+            } catch (Exception e) {
+                throw new TestException("Couldn't create pub sub server : " + e);
+            }
+        }
+
+        public String getAddress() {
+            return address;
+        }
+
+        public void kill() {
+            server.shutdown();
+        }
     }
 
     private String createTempDirectory(String suffix) throws IOException {
-	String dir = System.getProperty("java.io.tmpdir") + File.separator + System.currentTimeMillis() + suffix;
-	final File dirf = new File(dir);
-	boolean good = dirf.mkdir();
-	if (!good) {
-	    throw new IOException("Unable to create directory " + dir);
-	}
-	
-	Runtime.getRuntime().addShutdownHook(new Thread() {
-		public void delete(File f) {
-		    File[] subfiles = f.listFiles();
-		    if (subfiles != null) {
-			for (File subf : subfiles) {
-			    delete(subf);
-			}
-		    }
-		    f.delete();
-		}
-
-		public void run() {
-		    delete(dirf);
-		}
-	    });
-	return dir;
+        String dir = System.getProperty("java.io.tmpdir") + File.separator + System.currentTimeMillis() + suffix;
+        final File dirf = new File(dir);
+        boolean good = dirf.mkdir();
+        if (!good) {
+            throw new IOException("Unable to create directory " + dir);
+        }
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void delete(File f) {
+                File[] subfiles = f.listFiles();
+                if (subfiles != null) {
+                    for (File subf : subfiles) {
+                        delete(subf);
+                    }
+                }
+                f.delete();
+            }
+
+            public void run() {
+                delete(dirf);
+            }
+        });
+        return dir;
     }
 
     public TestServer startZookeeperServer(int port) throws IOException, TestException {
-	String dir = createTempDirectory("-zookeeper-" + port);
-	ZookeeperServer server =  new ZookeeperServer(port, dir);
-	
-	return server;
+        String dir = createTempDirectory("-zookeeper-" + port);
+        ZookeeperServer server =  new ZookeeperServer(port, dir);
+
+        return server;
     }
-    
+
     public TestServer startBookieServer(int port, TestServer zookeeperServer) throws IOException, TestException {
-	int tries = 4;
-	while (true) {
-	    try {
-		tries--;
-		ZooKeeper zk = new ZooKeeper(zookeeperServer.getAddress(), 1000, new Watcher() { public void process(WatchedEvent event) { /* do nothing */ } });
-		if (zk.exists("/ledgers/available", false) == null) {
-		    byte[] data = new byte[1];
-		    data[0] = 0;
-		    zk.create("/ledgers", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-		    zk.create("/ledgers/available", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-		}
-		zk.close();
-		break;
-	    } catch (KeeperException.ConnectionLossException ce) {
-		if (tries > 0) {
-		    try { 
-			Thread.sleep(3);
-		    } catch (Exception e) {
-			throw new TestException("Can't even sleep. Fix your machine: " + e);
-		    }
-		    continue;
-		} else {
-		    throw new TestException("Error connecting to zookeeper: " + ce);
-		}
-	    } catch (Exception e) {
-		throw new TestException("Error initialising bookkeeper ledgers: " +  e);
-	    } 
-	}
-	String journal = createTempDirectory("-bookie-" + port + "-journal");
-	String ledger = createTempDirectory("-bookie-" + port + "-ledger");
-	BookKeeperServer bookie = new BookKeeperServer(port, zookeeperServer, journal, ledger);
-	return bookie;
+        int tries = 4;
+        while (true) {
+            try {
+                tries--;
+                ZooKeeper zk = new ZooKeeper(zookeeperServer.getAddress(), 1000, new Watcher() {
+                    public void process(WatchedEvent event) {
+                        /* do nothing */
+                    }
+                });
+                if (zk.exists("/ledgers/available", false) == null) {
+                    byte[] data = new byte[1];
+                    data[0] = 0;
+                    zk.create("/ledgers", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    zk.create("/ledgers/available", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
+                zk.close();
+                break;
+            } catch (KeeperException.ConnectionLossException ce) {
+                if (tries > 0) {
+                    try {
+                        Thread.sleep(3);
+                    } catch (Exception e) {
+                        throw new TestException("Can't even sleep. Fix your machine: " + e);
+                    }
+                    continue;
+                } else {
+                    throw new TestException("Error connecting to zookeeper: " + ce);
+                }
+            } catch (Exception e) {
+                throw new TestException("Error initialising bookkeeper ledgers: " +  e);
+            }
+        }
+        String journal = createTempDirectory("-bookie-" + port + "-journal");
+        String ledger = createTempDirectory("-bookie-" + port + "-ledger");
+        BookKeeperServer bookie = new BookKeeperServer(port, zookeeperServer, journal, ledger);
+        return bookie;
     }
-    
+
     public TestServer startPubSubServer(int port, String region, TestServer zookeeperServer) throws IOException, TestException {
-	return new HedwigServer(port, region, zookeeperServer);
-    }    
+        return new HedwigServer(port, region, zookeeperServer);
+    }
 
     public ServerControl() {
     }
 
     public static void main(String[] args) throws Exception {
-	ServerControl control = new ServerControl();
+        ServerControl control = new ServerControl();
 
-	TestServer zk = control.startZookeeperServer(12345);
-	TestServer bk1 = control.startBookieServer(12346, zk);
-	TestServer bk2 = control.startBookieServer(12347, zk);
-	TestServer bk3 = control.startBookieServer(12348, zk);
-
-	TestServer hw1 = control.startPubSubServer(12349, "foobar", zk);
-	TestServer hw2 = control.startPubSubServer(12350, "foobar", zk);
-	TestServer hw3 = control.startPubSubServer(12351, "foobar", zk);
-	TestServer hw4 = control.startPubSubServer(12352, "barfoo", zk);
-	LOG.info("Started " + zk.getAddress());
-	LOG.info("Sleeping for 10 seconds");
-	Thread.sleep(10000);
-	bk3.kill();
-	bk2.kill();
-	bk1.kill();
-	zk.kill();
-	hw1.kill();
-	hw2.kill();
-	hw3.kill();
-	hw4.kill();
+        TestServer zk = control.startZookeeperServer(12345);
+        TestServer bk1 = control.startBookieServer(12346, zk);
+        TestServer bk2 = control.startBookieServer(12347, zk);
+        TestServer bk3 = control.startBookieServer(12348, zk);
+
+        TestServer hw1 = control.startPubSubServer(12349, "foobar", zk);
+        TestServer hw2 = control.startPubSubServer(12350, "foobar", zk);
+        TestServer hw3 = control.startPubSubServer(12351, "foobar", zk);
+        TestServer hw4 = control.startPubSubServer(12352, "barfoo", zk);
+        LOG.info("Started " + zk.getAddress());
+        LOG.info("Sleeping for 10 seconds");
+        Thread.sleep(10000);
+        bk3.kill();
+        bk2.kill();
+        bk1.kill();
+        zk.kill();
+        hw1.kill();
+        hw2.kill();
+        hw3.kill();
+        hw4.kill();
     }
-}
\ No newline at end of file
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControlDaemon.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControlDaemon.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControlDaemon.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/ServerControlDaemon.java Mon Sep  5 17:38:57 2011
@@ -28,7 +28,7 @@ import org.jboss.netty.channel.socket.ni
 
 import org.apache.log4j.Logger;
 
-import org.jboss.netty.channel.Channel;  
+import org.jboss.netty.channel.Channel;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.ChannelEvent;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -49,123 +49,123 @@ public class ServerControlDaemon {
         Logger.getLogger(ServerControlDaemon.class);
 
     @ChannelPipelineCoverage("all")
-    public static class ServerControlDaemonHandler extends SimpleChannelUpstreamHandler{
-	private ServerControl control;
-	private HashMap<Channel, HashMap<String, ServerControl.TestServer>> serverMap;
-	
-	public ServerControlDaemonHandler() {
-	    serverMap = new HashMap<Channel, HashMap<String, ServerControl.TestServer>>();
-	    control = new ServerControl();
-	}
-
-	private void addServerForChannel(Channel c, ServerControl.TestServer t) {
-	    LOG.info("Created server " + t.getAddress());
-	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
-	    if (map == null) {
-		map = new HashMap<String, ServerControl.TestServer>();
-		serverMap.put(c, map);
-	    }
-	    map.put(t.getAddress(), t);	    
-	}
-	
-	private void killServerForChannel(Channel c, String name) {
-	    LOG.info("Killing server " + name);
-	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
-	    ServerControl.TestServer t = map.get(name);
-	    map.remove(name);
-	    try {
-		t.kill();
-	    } catch (Exception e) {
-		LOG.error("Error killing server", e);
-		// do nothing, should be killed, we won't use it again anyhow
-	    }
-	}
-
-	private ServerControl.TestServer lookupServer(Channel c, String name) {
-	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
-	    return map.get(name);
-	}
-	
-	private void clearServersForChannel(Channel c) {
-	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
-	    serverMap.remove(map);
-	    
-	    if (map != null) {
-		for (ServerControl.TestServer t : map.values()) {
-		    t.kill();
-		}
-		map.clear();
-	    }
-	}
-
-	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-	    try {
-		String command = (String)e.getMessage();
-		LOG.info("Command: " + command);
-		String[] args = command.split("\\s+");
-
-		if (args[0].equals("START")) {
-		    ServerControl.TestServer t = null;
-		    if (args[1].equals("ZOOKEEPER")) {
-			t = control.startZookeeperServer(Integer.valueOf(args[2]));
-			addServerForChannel(ctx.getChannel(), t);
-		    } else if (args[1].equals("BOOKKEEPER")) {
-			ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[3]);
-			t = control.startBookieServer(Integer.valueOf(args[2]), zk);
-			addServerForChannel(ctx.getChannel(), t);
-		    } else if (args[1].equals("HEDWIG")) {
-			ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[4]);
-			t = control.startPubSubServer(Integer.valueOf(args[2]), args[3], zk);
-			addServerForChannel(ctx.getChannel(), t);
-		    }
-
-		    ctx.getChannel().write("OK " + t.getAddress() + "\n");
-		} else if (args[0].equals("KILL")) {
-		    killServerForChannel(ctx.getChannel(), args[1]);
-		    
-		    ctx.getChannel().write("OK Killed " + args[1] + "\n");
-		} else if (args[0].equals("TEST")) {
-		    LOG.info("\n******\n\n" + args[1] + "\n\n******");
-		    ctx.getChannel().write("OK Test Noted\n");
-		} else {
-		    ctx.getChannel().write("ERR Bad Command\n");
-		}
-	    } catch (Exception ex) {
-		LOG.error("Error handling message", ex);
-		ctx.getChannel().write("ERR " + ex.toString() + "\n");
-	    }
-	}
-	
-	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-	    clearServersForChannel(ctx.getChannel());
-	}
+    public static class ServerControlDaemonHandler extends SimpleChannelUpstreamHandler {
+        private ServerControl control;
+        private HashMap<Channel, HashMap<String, ServerControl.TestServer>> serverMap;
+
+        public ServerControlDaemonHandler() {
+            serverMap = new HashMap<Channel, HashMap<String, ServerControl.TestServer>>();
+            control = new ServerControl();
+        }
+
+        private void addServerForChannel(Channel c, ServerControl.TestServer t) {
+            LOG.info("Created server " + t.getAddress());
+            HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+            if (map == null) {
+                map = new HashMap<String, ServerControl.TestServer>();
+                serverMap.put(c, map);
+            }
+            map.put(t.getAddress(), t);
+        }
+
+        private void killServerForChannel(Channel c, String name) {
+            LOG.info("Killing server " + name);
+            HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+            ServerControl.TestServer t = map.get(name);
+            map.remove(name);
+            try {
+                t.kill();
+            } catch (Exception e) {
+                LOG.error("Error killing server", e);
+                // do nothing, should be killed, we won't use it again anyhow
+            }
+        }
+
+        private ServerControl.TestServer lookupServer(Channel c, String name) {
+            HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+            return map.get(name);
+        }
+
+        private void clearServersForChannel(Channel c) {
+            HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+            serverMap.remove(map);
+
+            if (map != null) {
+                for (ServerControl.TestServer t : map.values()) {
+                    t.kill();
+                }
+                map.clear();
+            }
+        }
+
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+            try {
+                String command = (String)e.getMessage();
+                LOG.info("Command: " + command);
+                String[] args = command.split("\\s+");
+
+                if (args[0].equals("START")) {
+                    ServerControl.TestServer t = null;
+                    if (args[1].equals("ZOOKEEPER")) {
+                        t = control.startZookeeperServer(Integer.valueOf(args[2]));
+                        addServerForChannel(ctx.getChannel(), t);
+                    } else if (args[1].equals("BOOKKEEPER")) {
+                        ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[3]);
+                        t = control.startBookieServer(Integer.valueOf(args[2]), zk);
+                        addServerForChannel(ctx.getChannel(), t);
+                    } else if (args[1].equals("HEDWIG")) {
+                        ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[4]);
+                        t = control.startPubSubServer(Integer.valueOf(args[2]), args[3], zk);
+                        addServerForChannel(ctx.getChannel(), t);
+                    }
+
+                    ctx.getChannel().write("OK " + t.getAddress() + "\n");
+                } else if (args[0].equals("KILL")) {
+                    killServerForChannel(ctx.getChannel(), args[1]);
+
+                    ctx.getChannel().write("OK Killed " + args[1] + "\n");
+                } else if (args[0].equals("TEST")) {
+                    LOG.info("\n******\n\n" + args[1] + "\n\n******");
+                    ctx.getChannel().write("OK Test Noted\n");
+                } else {
+                    ctx.getChannel().write("ERR Bad Command\n");
+                }
+            } catch (Exception ex) {
+                LOG.error("Error handling message", ex);
+                ctx.getChannel().write("ERR " + ex.toString() + "\n");
+            }
+        }
+
+        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+            clearServersForChannel(ctx.getChannel());
+        }
     }
 
     public static void main(String[] args) throws Exception {
-	// Configure the server.
-	int port = 5672;
-	if (args.length == 1) {
-	    port = Integer.valueOf(args[0]); 
-	}
-	ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
-											  Executors.newCachedThreadPool()));
-	// Set up the pipeline factory.
-	bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-		public ChannelPipeline getPipeline() throws Exception {
-		    ChannelPipeline p = Channels.pipeline();
-		    p.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
-		    p.addLast("stringDecoder", new StringDecoder("UTF-8"));
-		    
-		    // Encoder
-		    p.addLast("stringEncoder", new StringEncoder("UTF-8"));
-		    p.addLast("handler", new ServerControlDaemonHandler());
-		    
-		    return p;
-		}
-	    });
-	
-	LOG.info("Listening on localhost:"+port);
-	// Bind and start to accept incoming connections.
-	bootstrap.bind(new InetSocketAddress(port));
+        // Configure the server.
+        int port = 5672;
+        if (args.length == 1) {
+            port = Integer.valueOf(args[0]);
+        }
+        ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool()));
+        // Set up the pipeline factory.
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            public ChannelPipeline getPipeline() throws Exception {
+                ChannelPipeline p = Channels.pipeline();
+                p.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
+                p.addLast("stringDecoder", new StringDecoder("UTF-8"));
+
+                // Encoder
+                p.addLast("stringEncoder", new StringEncoder("UTF-8"));
+                p.addLast("handler", new ServerControlDaemonHandler());
+
+                return p;
+            }
+        });
+
+        LOG.info("Listening on localhost:"+port);
+        // Bind and start to accept incoming connections.
+        bootstrap.bind(new InetSocketAddress(port));
     }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Mon Sep  5 17:38:57 2011
@@ -78,7 +78,7 @@ public class TestPubSubClient extends Pu
     // Test implementation of subscriber's message handler.
     class TestMessageHandler implements MessageHandler {
         public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
-                Object context) {
+                            Object context) {
             new Thread(new Runnable() {
                 @Override
                 public void run() {
@@ -112,7 +112,7 @@ public class TestPubSubClient extends Pu
         boolean publishSuccess = true;
         try {
             publisher.publish(ByteString.copyFromUtf8("mySyncTopic"), Message.newBuilder().setBody(
-                    ByteString.copyFromUtf8("Hello Sync World!")).build());
+                                  ByteString.copyFromUtf8("Hello Sync World!")).build());
         } catch (Exception e) {
             publishSuccess = false;
         }
@@ -122,7 +122,7 @@ public class TestPubSubClient extends Pu
     @Test
     public void testAsyncPublish() throws Exception {
         publisher.asyncPublish(ByteString.copyFromUtf8("myAsyncTopic"), Message.newBuilder().setBody(
-                ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null);
+                                   ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null);
         assertTrue(queue.take());
     }
 
@@ -132,13 +132,13 @@ public class TestPubSubClient extends Pu
         ByteString topic2 = ByteString.copyFromUtf8("myNewTopic");
 
         publisher.asyncPublish(topic1, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello World!")).build(),
-                new TestCallback(), null);
+                               new TestCallback(), null);
         assertTrue(queue.take());
         publisher.asyncPublish(topic2, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello on new topic!"))
-                .build(), new TestCallback(), null);
+                               .build(), new TestCallback(), null);
         assertTrue(queue.take());
         publisher.asyncPublish(topic1, Message.newBuilder().setBody(
-                ByteString.copyFromUtf8("Hello Again on old topic!")).build(), new TestCallback(), null);
+                                   ByteString.copyFromUtf8("Hello Again on old topic!")).build(), new TestCallback(), null);
         assertTrue(queue.take());
     }
 
@@ -156,7 +156,7 @@ public class TestPubSubClient extends Pu
     @Test
     public void testAsyncSubscribe() throws Exception {
         subscriber.asyncSubscribe(ByteString.copyFromUtf8("myAsyncSubscribeTopic"), ByteString.copyFromUtf8("1"),
-                CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+                                  CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
         assertTrue(queue.take());
     }
 
@@ -173,23 +173,23 @@ public class TestPubSubClient extends Pu
         // Now publish some messages for the topic to be consumed by the
         // subscriber.
         publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
-                new TestCallback(), null);
+                               new TestCallback(), null);
         assertTrue(queue.take());
         assertTrue(consumeQueue.take());
         publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #2")).build(),
-                new TestCallback(), null);
+                               new TestCallback(), null);
         assertTrue(queue.take());
         assertTrue(consumeQueue.take());
         publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #3")).build(),
-                new TestCallback(), null);
+                               new TestCallback(), null);
         assertTrue(queue.take());
         assertTrue(consumeQueue.take());
         publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #4")).build(),
-                new TestCallback(), null);
+                               new TestCallback(), null);
         assertTrue(queue.take());
         assertTrue(consumeQueue.take());
         publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #5")).build(),
-                new TestCallback(), null);
+                               new TestCallback(), null);
         assertTrue(queue.take());
         assertTrue(consumeQueue.take());
     }
@@ -227,4 +227,4 @@ public class TestPubSubClient extends Pu
         assertTrue(true);
     }
 
-}
\ No newline at end of file
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java Mon Sep  5 17:38:57 2011
@@ -33,7 +33,7 @@ import org.apache.hedwig.server.persiste
 /**
  * This is a base class for any tests that need a Hedwig Hub(s) setup with an
  * associated BookKeeper and ZooKeeper instance.
- * 
+ *
  */
 public abstract class HedwigHubTestBase extends TestCase {
 
@@ -79,7 +79,7 @@ public abstract class HedwigHubTestBase 
         public String getZkHost() {
             return bktb.getZkHostPort();
         }
-        
+
         @Override
         public boolean isSSLEnabled() {
             return true;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java Mon Sep  5 17:38:57 2011
@@ -39,7 +39,7 @@ import org.apache.hedwig.util.HedwigSock
  * This is a base class for any tests that need a Hedwig Region(s) setup with a
  * number of Hedwig hubs per region, an associated HedwigClient per region and
  * the required BookKeeper and ZooKeeper instances.
- * 
+ *
  */
 public abstract class HedwigRegionTestBase extends TestCase {
 
@@ -173,7 +173,7 @@ public abstract class HedwigRegionTestBa
 
         // Create the Hedwig PubSubServer Hubs for all of the regions
         regionServersMap = new HashMap<String, List<PubSubServer>>(numRegions, 1.0f);
-        regionClientsMap = new HashMap<String, HedwigClient>(numRegions, 1.0f);        
+        regionClientsMap = new HashMap<String, HedwigClient>(numRegions, 1.0f);
         for (int i = 0; i < numRegions; i++) {
             List<PubSubServer> serversList = new LinkedList<PubSubServer>();
             // For the current region, create the necessary amount of hub
@@ -181,8 +181,8 @@ public abstract class HedwigRegionTestBa
             // starting from the initial ones defined.
             for (int j = 0; j < numServersPerRegion; j++) {
                 serversList.add(new PubSubServer(getServerConfiguration(initialServerPort
-                        + (j + i * numServersPerRegion), initialSSLServerPort + (j + i * numServersPerRegion),
-                        REGION_PREFIX + i)));
+                                                 + (j + i * numServersPerRegion), initialSSLServerPort + (j + i * numServersPerRegion),
+                                                 REGION_PREFIX + i)));
             }
             // Store this list of servers created for the current region
             regionServersMap.put(REGION_PREFIX + i, serversList);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java Mon Sep  5 17:38:57 2011
@@ -60,9 +60,9 @@ public class TestPubSubServerStartup {
     }
 
     private void instantiateAndDestroyPubSubServer() throws IOException, InterruptedException, ConfigurationException,
-            MalformedURLException, Exception {
+        MalformedURLException, Exception {
         String hedwigParams = "default_server_host=localhost:4080\n" + "zookeeper_connection_string=localhost:2181\n"
-                + "zk_timeout=2000\n";
+                              + "zk_timeout=2000\n";
 
         File hedwigConfigFile = new File(System.getProperty("java.io.tmpdir") + "/hedwig.cfg");
         writeStringToFile(hedwigParams, hedwigConfigFile);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Mon Sep  5 17:38:57 2011
@@ -36,7 +36,7 @@ public class StubDeliveryManager impleme
         public boolean isHubSubscriber;
 
         public StartServingRequest(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
+                                   DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
             this.topic = topic;
             this.subscriberId = subscriberId;
             this.seqIdToStartFrom = seqIdToStartFrom;
@@ -51,10 +51,10 @@ public class StubDeliveryManager impleme
 
     @Override
     public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-            DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
+                                         DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
 
         lastRequest.add(new StartServingRequest(topic, subscriberId, seqIdToStartFrom, endPoint, filter,
-                isHubSubscriber));
+                                                isHubSubscriber));
 
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Mon Sep  5 17:38:57 2011
@@ -80,7 +80,7 @@ public class TestSubUnsubHandler extends
 
         subRequestPrototype = SubscribeRequest.newBuilder().setSubscriberId(subscriberId).build();
         pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType(
-                OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
+                                     OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
 
         ush = new UnsubscribeHandler(tm, conf, sm, dm);
     }
@@ -88,9 +88,9 @@ public class TestSubUnsubHandler extends
     @Test
     public void testNoSubscribeRequest() {
         sh.handleRequestAtOwner(PubSubRequest.newBuilder(pubSubRequestPrototype).clearSubscribeRequest().build(),
-                channel);
+                                channel);
         assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
-                .getStatusCode());
+                     .getStatusCode());
     }
 
     @Test
@@ -118,11 +118,11 @@ public class TestSubUnsubHandler extends
         // make sure subscription was registered
         StubCallback<MessageSeqId> callback1 = new StubCallback<MessageSeqId>();
         sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
-                CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1,
-                null);
+                                     CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1,
+                                 null);
 
         assertEquals(PubSubException.ClientAlreadySubscribedException.class, ConcurrencyUtils.take(callback1.queue)
-                .right().getClass());
+                     .right().getClass());
 
         // trying to subscribe again should throw an error
         WriteRecordingChannel dupChannel = new WriteRecordingChannel();
@@ -140,10 +140,10 @@ public class TestSubUnsubHandler extends
         channel = new WriteRecordingChannel();
         ush.handleRequestAtOwner(pubSubRequestPrototype, channel);
         assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
-                .getStatusCode());
+                     .getStatusCode());
 
         PubSubRequest unsubRequest = PubSubRequest.newBuilder(pubSubRequestPrototype).setUnsubscribeRequest(
-                UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build();
+                                         UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build();
         channel = new WriteRecordingChannel();
         dm.lastRequest.clear();
 
@@ -156,10 +156,10 @@ public class TestSubUnsubHandler extends
         // make sure the info is gone from the sm
         StubCallback<MessageSeqId> callback2 = new StubCallback<MessageSeqId>();
         sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
-                CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2,
-                null);
+                                     CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2,
+                                 null);
         assertEquals(PubSubException.ClientNotSubscribedException.class, ConcurrencyUtils.take(callback2.queue).right()
-                .getClass());
+                     .getClass());
 
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Mon Sep  5 17:38:57 2011
@@ -140,7 +140,7 @@ public class TestHedwigHub extends Hedwi
         }
 
         public void consume(ByteString topic, ByteString subscriberId, final Message msg, Callback<Void> callback,
-                Object context) {
+                            Object context) {
             if (!consumedMessages.contains(msg.getMsgId())) {
                 // New message to consume. Add it to the Set of consumed
                 // messages.
@@ -154,7 +154,7 @@ public class TestHedwigHub extends Hedwi
                         public void run() {
                             if (logger.isDebugEnabled())
                                 logger.debug("Consuming message that is out of order for msgId: "
-                                        + msg.getMsgId().getLocalComponent());
+                                             + msg.getMsgId().getLocalComponent());
                             ConcurrencyUtils.put(consumeQueue, false);
                         }
                     }).start();
@@ -241,13 +241,13 @@ public class TestHedwigHub extends Hedwi
     }
 
     protected void startDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId,
-            MessageHandler handler) throws Exception {
+                                 MessageHandler handler) throws Exception {
         subscriber.startDelivery(topic, subscriberId, handler);
         if (mode == Mode.PROXY) {
             WriteRecordingChannel channel = new WriteRecordingChannel();
             PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
-                    .setTopic(topic).setTxnId(0).setType(OperationType.START_DELIVERY).setStartDeliveryRequest(
-                            StartDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
+                                    .setTopic(topic).setTxnId(0).setType(OperationType.START_DELIVERY).setStartDeliveryRequest(
+                                        StartDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
             proxy.getStartDeliveryHandler().handleRequest(request, channel);
             assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
         }
@@ -280,7 +280,7 @@ public class TestHedwigHub extends Hedwi
             logger.debug("Subscribing to topics and starting delivery.");
         for (int i = 0; i < batchSize; i++) {
             subscriber.asyncSubscribe(getTopic(i), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
-                    new TestCallback(queue), null);
+                                      new TestCallback(queue), null);
             assertTrue(queue.take());
         }
 
@@ -327,7 +327,7 @@ public class TestHedwigHub extends Hedwi
         ByteString myTopic = getTopic(0);
         // Subscribe to a topic and start delivery on it
         mySubscriber.asyncSubscribe(myTopic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
-                new TestCallback(queue), null);
+                                    new TestCallback(queue), null);
         assertTrue(queue.take());
         startDelivery(mySubscriber, myTopic, localSubscriberId, new TestMessageHandler(consumeQueue));
         // Publish some messages
@@ -342,7 +342,7 @@ public class TestHedwigHub extends Hedwi
             boolean success = true;
             try {
                 mySubscriber.consume(myTopic, localSubscriberId, MessageSeqId.newBuilder().setLocalComponent(i + 1)
-                        .build());
+                                     .build());
             } catch (ClientNotSubscribedException e) {
                 success = false;
             }
@@ -364,7 +364,7 @@ public class TestHedwigHub extends Hedwi
     public void testAttachToSubscriptionSuccess() throws Exception {
         ByteString topic = getTopic(0);
         subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
-                null);
+                                  null);
         assertTrue(queue.take());
         // Close the subscription asynchronously
         subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
@@ -417,7 +417,7 @@ public class TestHedwigHub extends Hedwi
     public void testUnsubscribe() throws Exception {
         ByteString topic = getTopic(0);
         subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
-                null);
+                                  null);
         assertTrue(queue.take());
         startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
         publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -468,7 +468,7 @@ public class TestHedwigHub extends Hedwi
     public void testCloseSubscription() throws Exception {
         ByteString topic = getTopic(0);
         subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
-                null);
+                                  null);
         assertTrue(queue.take());
         startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
         publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -500,7 +500,7 @@ public class TestHedwigHub extends Hedwi
     public void testStopDelivery() throws Exception {
         ByteString topic = getTopic(0);
         subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
-                null);
+                                  null);
         assertTrue(queue.take());
         startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
         publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -541,7 +541,7 @@ public class TestHedwigHub extends Hedwi
     public void testConsumedMessagesInOrder() throws Exception {
         ByteString topic = getTopic(0);
         subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
-                null);
+                                  null);
         assertTrue(queue.take());
         startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
         // Now publish some messages and verify that they are delivered in order
@@ -562,7 +562,7 @@ public class TestHedwigHub extends Hedwi
     public void testCreateSubscriptionFailure() throws Exception {
         ByteString topic = getTopic(0);
         subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
-                null);
+                                  null);
         assertTrue(queue.take());
         // Close the subscription asynchronously
         subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
@@ -612,7 +612,7 @@ public class TestHedwigHub extends Hedwi
     @Test
     public void testAsyncSubscribeWithInvalidSubscriberId() throws Exception {
         subscriber.asyncSubscribe(getTopic(0), hubSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
-                new TestCallback(queue), null);
+                                  new TestCallback(queue), null);
         assertFalse(queue.take());
     }
 
@@ -659,7 +659,7 @@ public class TestHedwigHub extends Hedwi
         HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
         HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
         hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(
-                queue), null);
+                                         queue), null);
         assertFalse(queue.take());
         hubClient.stop();
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java Mon Sep  5 17:38:57 2011
@@ -59,8 +59,8 @@ public class TestHedwigRegion extends He
         for (HedwigClient client : regionClientsMap.values()) {
             for (int i = 0; i < batchSize; i++) {
                 client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
-                        ByteString.copyFromUtf8("LocalSubscriber"), CreateOrAttach.CREATE_OR_ATTACH,
-                        new TestCallback(queue), null);
+                                                      ByteString.copyFromUtf8("LocalSubscriber"), CreateOrAttach.CREATE_OR_ATTACH,
+                                                      new TestCallback(queue), null);
                 assertTrue(queue.take());
             }
         }
@@ -69,7 +69,7 @@ public class TestHedwigRegion extends He
         for (HedwigClient client : regionClientsMap.values()) {
             for (int i = 0; i < batchSize; i++) {
                 client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i),
-                        ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue));
+                                                     ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue));
             }
         }
 
@@ -79,7 +79,7 @@ public class TestHedwigRegion extends He
         HedwigPublisher publisher = regionClientsMap.values().iterator().next().getPublisher();
         for (int i = 0; i < batchSize; i++) {
             publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), Message.newBuilder().setBody(
-                    ByteString.copyFromUtf8("Message" + i)).build(), new TestCallback(queue), null);
+                                       ByteString.copyFromUtf8("Message" + i)).build(), new TestCallback(queue), null);
             assertTrue(queue.take());
         }
         // Make sure each region consumes the same set of published messages.
@@ -87,7 +87,7 @@ public class TestHedwigRegion extends He
             for (int j = 0; j < batchSize; j++) {
                 assertTrue(consumeQueue.take());
             }
-        }       
+        }
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java Mon Sep  5 17:38:57 2011
@@ -77,7 +77,7 @@ public class TestPubSubServer extends Pu
     }
 
     PubSubServer startServer(final UncaughtExceptionHandler uncaughtExceptionHandler, final int port,
-            final TopicManagerInstantiator instantiator) throws Exception {
+                             final TopicManagerInstantiator instantiator) throws Exception {
         PubSubServer server = new PubSubServer(new StandAloneServerConfiguration() {
             @Override
             public int getServerPort() {
@@ -105,7 +105,7 @@ public class TestPubSubServer extends Pu
         }).getPublisher();
 
         publisher.asyncPublish(ByteString.copyFromUtf8("blah"), Message.newBuilder().setBody(
-                ByteString.copyFromUtf8("blah")).build(), new Callback<Void>() {
+        ByteString.copyFromUtf8("blah")).build(), new Callback<Void>() {
             @Override
             public void operationFailed(Object ctx, PubSubException exception) {
                 assertTrue(false);
@@ -132,8 +132,8 @@ public class TestPubSubServer extends Pu
             public TopicManager instantiateTopicManager() throws IOException {
                 return new AbstractTopicManager(new ServerConfiguration(), Executors.newSingleThreadScheduledExecutor()) {
                     @Override
-                    protected void realGetOwner(ByteString topic, boolean shouldClaim, 
-                            Callback<HedwigSocketAddress> cb, Object ctx) {
+                    protected void realGetOwner(ByteString topic, boolean shouldClaim,
+                    Callback<HedwigSocketAddress> cb, Object ctx) {
                         throw new RuntimeException("this exception should be uncaught");
                     }
 
@@ -164,8 +164,8 @@ public class TestPubSubServer extends Pu
                 return new AbstractTopicManager(new ServerConfiguration(), Executors.newSingleThreadScheduledExecutor()) {
 
                     @Override
-                    protected void realGetOwner(ByteString topic, boolean shouldClaim, 
-                            Callback<HedwigSocketAddress> cb, Object ctx) {
+                    protected void realGetOwner(ByteString topic, boolean shouldClaim,
+                    Callback<HedwigSocketAddress> cb, Object ctx) {
                         ZooKeeper zookeeper;
                         try {
                             zookeeper = new ZooKeeper(hostPort, 60000, new Watcher() {
@@ -182,7 +182,7 @@ public class TestPubSubServer extends Pu
                         zookeeper.getData("/fake", false, new SafeAsyncZKCallback.DataCallback() {
                             @Override
                             public void safeProcessResult(int rc, String path, Object ctx, byte[] data,
-                                    org.apache.zookeeper.data.Stat stat) {
+                            org.apache.zookeeper.data.Stat stat) {
                                 throw new RuntimeException("This should go to the uncaught exception handler");
                             }
 
@@ -213,8 +213,8 @@ public class TestPubSubServer extends Pu
             @Override
             public List<String> getRegions() {
                 List<String> regionsList = new LinkedList<String>();
-                regionsList.add("regionHost1:4080:9876"); 
-                regionsList.add("regionHost2:4080"); 
+                regionsList.add("regionHost1:4080:9876");
+                regionsList.add("regionHost2:4080");
                 regionsList.add("regionHost3:4080:9876");
                 return regionsList;
             }
@@ -227,7 +227,7 @@ public class TestPubSubServer extends Pu
             success = true;
         }
         assertTrue(success);
-    }    
+    }
 
     @Test
     public void testValidServerConfiguration() throws Exception {
@@ -241,8 +241,8 @@ public class TestPubSubServer extends Pu
             @Override
             public List<String> getRegions() {
                 List<String> regionsList = new LinkedList<String>();
-                regionsList.add("regionHost1:4080:9876"); 
-                regionsList.add("regionHost2:4080:2938"); 
+                regionsList.add("regionHost1:4080:9876");
+                regionsList.add("regionHost2:4080:2938");
                 regionsList.add("regionHost3:4080:9876");
                 return regionsList;
             }
@@ -255,6 +255,6 @@ public class TestPubSubServer extends Pu
             success = false;
         }
         assertTrue(success);
-    }    
+    }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java Mon Sep  5 17:38:57 2011
@@ -38,7 +38,7 @@ import org.apache.log4j.Logger;
 /**
  * This is a base class for any tests that require a BookKeeper client/server
  * setup.
- * 
+ *
  */
 public class BookKeeperTestBase extends ZooKeeperTestBase {
     private static Logger logger = Logger.getLogger(BookKeeperTestBase.class);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java Mon Sep  5 17:38:57 2011
@@ -76,7 +76,7 @@ public class StubPersistenceManager impl
         }
 
         request.getCallback().messageScanned(request.getCtx(),
-                messages.get(request.getTopic()).get((int) request.getStartSeqId()));
+                                             messages.get(request.getTopic()).get((int) request.getStartSeqId()));
 
     }
 
@@ -90,7 +90,7 @@ public class StubPersistenceManager impl
         long startSeqId = request.getStartSeqId();
         for (int i = 0; i < request.getMessageLimit(); i++) {
             List<Message> messageList = MapMethods.getAfterInsertingIfAbsent(messages, request.getTopic(),
-                    ArrayListMessageFactory.instance);
+                                        ArrayListMessageFactory.instance);
             if (startSeqId + i > messageList.size()) {
                 request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
                 return;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java Mon Sep  5 17:38:57 2011
@@ -24,25 +24,25 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.util.ConcurrencyUtils;
 import org.apache.hedwig.util.Either;
 
-public class StubScanCallback implements ScanCallback{
+public class StubScanCallback implements ScanCallback {
 
     public static Message END_MESSAGE = Message.newBuilder().setBody(ByteString.EMPTY).build();
-    
+
     LinkedBlockingQueue<Either<Message, Exception>> queue = new LinkedBlockingQueue<Either<Message,Exception>>();
-    
+
     @Override
     public void messageScanned(Object ctx, Message message) {
-       ConcurrencyUtils.put(queue, Either.of(message, (Exception) null));
+        ConcurrencyUtils.put(queue, Either.of(message, (Exception) null));
     }
-    
+
     @Override
     public void scanFailed(Object ctx, Exception exception) {
         ConcurrencyUtils.put(queue, Either.of((Message) null, exception));
     }
-    
+
     @Override
     public void scanFinished(Object ctx, ReasonForFinish reason) {
         ConcurrencyUtils.put(queue, Either.of(END_MESSAGE, (Exception) null));
-        
+
     }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java Mon Sep  5 17:38:57 2011
@@ -31,8 +31,8 @@ import org.apache.hedwig.server.topics.T
 
 public class TestBookKeeperPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
     BookKeeperTestBase bktb;
-    private final int numBookies = 3;   
-    
+    private final int numBookies = 3;
+
     @Override
     @Before
     protected void setUp() throws Exception {
@@ -61,7 +61,7 @@ public class TestBookKeeperPersistenceMa
         ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 
         return new BookkeeperPersistenceManager(bktb.bk, bktb.getZooKeeperClient(), new TrivialOwnAllTopicManager(conf,
-                scheduler), conf, scheduler);
+                                                scheduler), conf, scheduler);
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java Mon Sep  5 17:38:57 2011
@@ -78,7 +78,7 @@ public class TestBookkeeperPersistenceMa
         // now abandon, and try another time, the prev ledger should be dirty
 
         bkpm = new BookkeeperPersistenceManager(new BookKeeper(bktb.getZkHostPort()), bktb.getZooKeeperClient(), tm,
-                conf, scheduler);
+                                                conf, scheduler);
         bkpm.acquiredTopic(topic, stubCallback, null);
         assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
         assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
@@ -92,7 +92,7 @@ public class TestBookkeeperPersistenceMa
         int index = 0;
         int numPrevLedgers = 0;
         List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
-                SIZE_OF_MESSAGES_TO_TEST);
+                                 SIZE_OF_MESSAGES_TO_TEST);
 
         while (index < messages.size()) {
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java Mon Sep  5 17:38:57 2011
@@ -163,13 +163,13 @@ public abstract class TestPersistenceMan
                     PersistenceManagerWithRangeScan rangePersistenceManager = (PersistenceManagerWithRangeScan) persistenceManager;
 
                     rangePersistenceManager.scanMessages(new RangeScanRequest(topic, getLowestSeqId(),
-                            NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue));
+                                                         NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue));
 
                 } else {
 
                     ScanCallback listener = new PointScanVerifierListener(pubMsgs, topic);
                     persistenceManager
-                            .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue));
+                    .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue));
 
                 }
                 // now listen for it to finish
@@ -295,7 +295,7 @@ public abstract class TestPersistenceMan
         assertEquals(null, failureException);
         for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
             assertEquals(persistenceManager.getCurrentSeqIdForTopic(getTopicName(i)).getLocalComponent(),
-                    getExpectedSeqId(NUM_MESSAGES_TO_TEST));
+                         getExpectedSeqId(NUM_MESSAGES_TO_TEST));
         }
 
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java Mon Sep  5 17:38:57 2011
@@ -90,7 +90,7 @@ public class TestReadAheadCacheWhiteBox 
     }
 
     @Test
-    public void testPersistMessage() throws Exception{
+    public void testPersistMessage() throws Exception {
         StubCallback<Long> callback = new StubCallback<Long>();
         PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null);
 
@@ -99,20 +99,20 @@ public class TestReadAheadCacheWhiteBox 
         assertNotNull(ConcurrencyUtils.take(callback.queue).right());
 
         CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
-                .getLocalComponent());
+                                    .getLocalComponent());
         assertFalse(cacheBasedPersistenceManager.cache.containsKey(key));
 
         stubPersistenceManager.failure = false;
         persistMessage(messages.get(0));
     }
 
-    private void persistMessage(Message msg) throws Exception{
+    private void persistMessage(Message msg) throws Exception {
         StubCallback<Long> callback = new StubCallback<Long>();
         PersistRequest request = new PersistRequest(topic, msg, callback, null);
         cacheBasedPersistenceManager.persistMessage(request);
         assertNotNull(ConcurrencyUtils.take(callback.queue).left());
         CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
-                .getLocalComponent());
+                                    .getLocalComponent());
         CacheValue cacheValue = cacheBasedPersistenceManager.cache.get(key);
         assertNotNull(cacheValue);
         assertFalse(cacheValue.isStub());
@@ -140,7 +140,7 @@ public class TestReadAheadCacheWhiteBox 
     }
 
     @Test
-    public void testDeliveredUntil() throws Exception{
+    public void testDeliveredUntil() throws Exception {
         for (Message m : messages) {
             persistMessage(m);
         }
@@ -184,7 +184,7 @@ public class TestReadAheadCacheWhiteBox 
     }
 
     @Test
-    public void testReadAheadSizeLimit() throws Exception{
+    public void testReadAheadSizeLimit() throws Exception {
         for (Message m : messages) {
             persistMessage(m);
         }
@@ -195,17 +195,17 @@ public class TestReadAheadCacheWhiteBox 
 
         assertTrue(callback.isSuccess());
         assertEquals((int) Math.ceil(myConf.readAheadSize / (MSG_SIZE + 0.0)), cacheBasedPersistenceManager.cache
-                .size());
+                     .size());
 
     }
 
     @Test
-    public void testDoReadAheadStartingFrom() throws Exception{
+    public void testDoReadAheadStartingFrom() throws Exception {
         persistMessage(messages.get(0));
         int readAheadCount = 5;
         int start = 1;
         RangeScanRequest readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, start,
-                readAheadCount);
+                                            readAheadCount);
         assertNull(readAheadRequest);
 
         StubScanCallback callback = new StubScanCallback();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java Mon Sep  5 17:38:57 2011
@@ -41,7 +41,7 @@ public class StubSubscriptionManager ext
 
     @Override
     public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
-            Callback<MessageSeqId> callback, Object ctx) {
+                                      Callback<MessageSeqId> callback, Object ctx) {
         if (fail) {
             callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
             return;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Mon Sep  5 17:38:57 2011
@@ -108,12 +108,12 @@ public class TestZkSubscriptionManager e
         sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
 
         Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
-                PubSubException.ServerNotResponsibleForTopicException.class);
+                            PubSubException.ServerNotResponsibleForTopicException.class);
 
         sm.unsubscribe(topic1, sub1, voidCallback, null);
 
         Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
-                PubSubException.ServerNotResponsibleForTopicException.class);
+                            PubSubException.ServerNotResponsibleForTopicException.class);
 
         //
         // Acquire topic.
@@ -127,31 +127,31 @@ public class TestZkSubscriptionManager e
 
         sm.unsubscribe(topic1, sub1, voidCallback, null);
         Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
-                PubSubException.ClientNotSubscribedException.class);
+                            PubSubException.ClientNotSubscribedException.class);
 
         //
         // Try to attach to a subscription.
         subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
-                .build();
+                     .build();
 
         sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
         Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
-                PubSubException.ClientNotSubscribedException.class);
+                            PubSubException.ClientNotSubscribedException.class);
 
         // now create
         subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).setSubscriberId(sub1)
-                .build();
+                     .build();
         sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
         Assert.assertEquals(msgId.getLocalComponent(), ConcurrencyUtils.take(msgIdCallbackQueue).left().getLocalComponent());
         Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                .getLocalComponent());
+                            .getLocalComponent());
 
         // try to create again
         sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
         Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
-                PubSubException.ClientAlreadySubscribedException.class);
+                            PubSubException.ClientAlreadySubscribedException.class);
         Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                .getLocalComponent());
+                            .getLocalComponent());
 
         sm.lostTopic(topic1);
         sm.acquiredTopic(topic1, voidCallback, null);
@@ -159,27 +159,27 @@ public class TestZkSubscriptionManager e
 
         // try to attach
         subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
-                .build();
+                     .build();
         MessageSeqId msgId1 = MessageSeqId.newBuilder().setLocalComponent(msgId.getLocalComponent() + 10).build();
         sm.serveSubscribeRequest(topic1, subRequest, msgId1, msgIdCallback, null);
         Assert.assertEquals(msgId.getLocalComponent(), msgIdCallbackQueue.take().left().getLocalComponent());
         Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                .getLocalComponent());
+                            .getLocalComponent());
 
         // now manipulate the consume ptrs
         // dont give it enough to have it persist to ZK
         MessageSeqId msgId2 = MessageSeqId.newBuilder().setLocalComponent(
-                msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build();
+                                  msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build();
         sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId2, voidCallback, null);
         Assert.assertTrue(BooleanCallbackQueue.take().left());
         Assert.assertEquals(msgId2.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                .getLocalComponent());
+                            .getLocalComponent());
         Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
-                .getLocalComponent());
+                            .getLocalComponent());
 
         // give it more so that it will write to ZK
         MessageSeqId msgId3 = MessageSeqId.newBuilder().setLocalComponent(
-                msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build();
+                                  msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build();
         sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId3, voidCallback, null);
         Assert.assertTrue(BooleanCallbackQueue.take().left());
 
@@ -188,9 +188,9 @@ public class TestZkSubscriptionManager e
         Assert.assertTrue(BooleanCallbackQueue.take().left());
 
         Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                .getLocalComponent());
+                            .getLocalComponent());
         Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
-                .getLocalComponent());
+                            .getLocalComponent());
 
         // finally unsubscribe
         sm.unsubscribe(topic1, sub1, voidCallback, null);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java Mon Sep  5 17:38:57 2011
@@ -44,8 +44,8 @@ public class StubTopicManager extends Tr
     }
 
     @Override
-    protected void realGetOwner(ByteString topic, boolean shouldClaim, 
-            Callback<HedwigSocketAddress> cb, Object ctx) {
+    protected void realGetOwner(ByteString topic, boolean shouldClaim,
+                                Callback<HedwigSocketAddress> cb, Object ctx) {
 
         if (shouldError) {
             cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java Mon Sep  5 17:38:57 2011
@@ -126,7 +126,7 @@ public class TestZkTopicManager extends 
     @Test
     public void testGetOwnerMulti() throws Exception {
         ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1), cfg2 = new CustomServerConfiguration(
-                cfg.getServerPort() + 2);
+            cfg.getServerPort() + 2);
         // TODO change cfg1 cfg2 params
         ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler), tm2 = new ZkTopicManager(zk, cfg2, scheduler);
 
@@ -265,7 +265,7 @@ public class TestZkTopicManager extends 
         Assert.assertEquals(topic, pair.first());
         Assert.assertTrue(pair.second());
         Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
-                .getExceptions().iterator().next().getClass());
+                            .getExceptions().iterator().next().getClass());
         Assert.assertFalse(tm.topics.contains(topic));
         Thread.sleep(100);
         assertOwnershipNodeDoesntExist();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java Mon Sep  5 17:38:57 2011
@@ -32,7 +32,7 @@ import org.apache.hedwig.util.Callback;
 
 /**
  * This is a base class for any tests that need a ZooKeeper client/server setup.
- * 
+ *
  */
 public abstract class ZooKeeperTestBase extends ClientBase {