You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/05/17 18:32:06 UTC

svn commit: r1339691 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/util/ hedwig-protocol...

Author: ivank
Date: Thu May 17 16:32:05 2012
New Revision: 1339691

URL: http://svn.apache.org/viewvc?rev=1339691&view=rev
Log:
BOOKKEEPER-72: Fix warnings issued by FindBugs (ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
    zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu May 17 16:32:05 2012
@@ -14,6 +14,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-254: Bump zookeeper version in poms (ivank)
 
+      BOOKKEEPER-72: Fix warnings issued by FindBugs (ivank)
+
       bookkeeper-server/
 
         BOOKKEEPER-142: Parsing last log id is wrong, which may make entry log files overwritten (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java Thu May 17 16:32:05 2012
@@ -61,6 +61,8 @@ public class BenchmarkPublisher extends 
 
         // picking constants arbitarily for warmup phase
         ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", nWarmup, 100);
+        agg.startProgress();
+
         Message msg = getMsg(1024);
         for (int i = 0; i < nWarmup; i++) {
             publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null);
@@ -100,6 +102,7 @@ public class BenchmarkPublisher extends 
         int myPublishLimit = numMessages / numRegions / numPartitions - myPublishCount;
         myPublishCount = 0;
         ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", myPublishLimit, nParallel);
+        agg.startProgress();
 
         int topicLabel = 0;
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java Thu May 17 16:32:05 2012
@@ -59,6 +59,8 @@ public class BenchmarkSubscriber extends
     public Void call() throws Exception {
 
         final ThroughputAggregator agg = new ThroughputAggregator("recvs", numMessages);
+        agg.startProgress();
+
         final Map<String, Long> lastSeqIdSeenMap = new HashMap<String, Long>();
 
         for (int i = startTopicLabel; i < startTopicLabel + numTopics; i++) {
@@ -120,6 +122,8 @@ public class BenchmarkSubscriber extends
             throws InterruptedException {
         long startTime = System.currentTimeMillis();
         ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator(label, count / numPartitions, npar);
+        agg.startProgress();
+
         int end = start + count;
         for (int i = start; i < end; ++i) {
             if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java Thu May 17 16:32:05 2012
@@ -60,6 +60,10 @@ public class BenchmarkUtils {
             }
         }
 
+        public void startProgress() {
+            tpAgg.startProgress();
+        }
+
         public void reportLatency(long latency) {
             sum.addAndGet(latency);
 
@@ -101,6 +105,7 @@ public class BenchmarkUtils {
         final AtomicInteger done = new AtomicInteger();
         final AtomicLong earliest = new AtomicLong();
         final AtomicInteger numFailed = new AtomicInteger();
+        final Thread progressThread;
         final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
 
         public ThroughputAggregator(final String label, final int count) {
@@ -109,7 +114,7 @@ public class BenchmarkUtils {
             if (count == 0)
                 queue.add(0);
             if (Boolean.getBoolean("progress")) {
-                new Thread(new Runnable() {
+                progressThread = new Thread(new Runnable() {
                     @Override
                     public void run() {
                         try {
@@ -123,7 +128,15 @@ public class BenchmarkUtils {
                             throw new RuntimeException(ex);
                         }
                     }
-                }).start();
+                    });
+            } else {
+                progressThread = null;
+            }
+        }
+
+        public void startProgress() {
+            if (progressThread != null) {
+                progressThread.start();
             }
         }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Thu May 17 16:32:05 2012
@@ -78,7 +78,7 @@ public class SubscribeResponseHandler {
 
     // Public getter to retrieve the original PubSubData used for the Subscribe
     // request.
-    public PubSubData getOrigSubData() {
+    synchronized public PubSubData getOrigSubData() {
         return origSubData;
     }
 
@@ -98,35 +98,37 @@ public class SubscribeResponseHandler {
                          + HedwigClientImpl.getHostFromChannel(channel));
         switch (response.getStatusCode()) {
         case SUCCESS:
-            // For successful Subscribe requests, store this Channel locally
-            // and set it to not be readable initially.
-            // This way we won't be delivering messages for this topic
-            // subscription until the client explicitly says so.
-            subscribeChannel = channel;
-            subscribeChannel.setReadable(false);
-            // Store the original PubSubData used to create this successful
-            // Subscribe request.
-            origSubData = pubSubData;
-            // Store the mapping for the TopicSubscriber to the Channel.
-            // This is so we can control the starting and stopping of
-            // message deliveries from the server on that Channel. Store
-            // this only on a successful ack response from the server.
-            TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
-            responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
-            // Lazily create the Set (from a concurrent hashmap) to keep track
-            // of outstanding Messages to be consumed by the client app. At this
-            // stage, delivery for that topic hasn't started yet so creation of 
-            // this Set should be thread safe. We'll create the Set with an initial
-            // capacity equal to the configured parameter for the maximum number of
-            // outstanding messages to allow. The load factor will be set to
-            // 1.0f which means we'll only rehash and allocate more space if
-            // we ever exceed the initial capacity. That should be okay
-            // because when that happens, things are slow already and piling
-            // up on the client app side to consume messages.
-            
-            outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap<Message,Boolean>(
-                responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
-            
+            synchronized(this) {
+                // For successful Subscribe requests, store this Channel locally
+                // and set it to not be readable initially.
+                // This way we won't be delivering messages for this topic
+                // subscription until the client explicitly says so.
+                subscribeChannel = channel;
+                subscribeChannel.setReadable(false);
+                // Store the original PubSubData used to create this successful
+                // Subscribe request.
+                origSubData = pubSubData;
+
+                // Store the mapping for the TopicSubscriber to the Channel.
+                // This is so we can control the starting and stopping of
+                // message deliveries from the server on that Channel. Store
+                // this only on a successful ack response from the server.
+                TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
+                responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
+                // Lazily create the Set (from a concurrent hashmap) to keep track
+                // of outstanding Messages to be consumed by the client app. At this
+                // stage, delivery for that topic hasn't started yet so creation of
+                // this Set should be thread safe. We'll create the Set with an initial
+                // capacity equal to the configured parameter for the maximum number of
+                // outstanding messages to allow. The load factor will be set to
+                // 1.0f which means we'll only rehash and allocate more space if
+                // we ever exceed the initial capacity. That should be okay
+                // because when that happens, things are slow already and piling
+                // up on the client app side to consume messages.
+                outstandingMsgSet = Collections.newSetFromMap(
+                        new ConcurrentHashMap<Message,Boolean>(
+                                responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
+            }
             // Response was success so invoke the callback's operationFinished
             // method.
             pubSubData.callback.operationFinished(pubSubData.context, null);
@@ -162,9 +164,11 @@ public class SubscribeResponseHandler {
     // Main method to handle consuming a message for a topic that the client is
     // subscribed to.
     public void handleSubscribeMessage(PubSubResponse response) {
-        if (logger.isDebugEnabled())
-            logger.debug("Handling a Subscribe message in response: " + response + ", topic: "
-                         + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+        if (logger.isDebugEnabled()) {
+            logger.debug("Handling a Subscribe message in response: {}, topic: {}, subscriberId: {}",
+                    new Object[] { response, getOrigSubData().topic.toStringUtf8(),
+                                   getOrigSubData().subscriberId.toStringUtf8() });
+        }
         Message message = response.getMessage();
 
         synchronized (this) {
@@ -300,9 +304,11 @@ public class SubscribeResponseHandler {
      *            MessageHandler to register for this ResponseHandler instance.
      */
     public void setMessageHandler(MessageHandler messageHandler) {
-        if (logger.isDebugEnabled())
-            logger.debug("Setting the messageHandler for topic: " + origSubData.topic.toStringUtf8()
-                         + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+        if (logger.isDebugEnabled()) {
+            logger.debug("Setting the messageHandler for topic: {}, subscriberId: {}",
+                         getOrigSubData().topic.toStringUtf8(),
+                         getOrigSubData().subscriberId.toStringUtf8());
+        }
         synchronized (this) {
             this.messageHandler = messageHandler;
             // Once the MessageHandler is registered, see if we have any queued up

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java Thu May 17 16:32:05 2012
@@ -124,6 +124,11 @@ public class HedwigSocketAddress {
         return (this.hostname.equals(that.hostname) && (this.port == that.port) && (this.sslPort == that.sslPort));
     }
 
+    @Override
+    public int hashCode() {
+        return (this.hostname + this.port + this.sslPort).hashCode();
+    }
+
     // Static helper method to return the string representation for an
     // InetSocketAddress. The HedwigClient can only operate in SSL or non-SSL
     // mode. So the server hosts it connects to will just be an

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java Thu May 17 16:32:05 2012
@@ -26,12 +26,12 @@ public class PathUtils {
     /** Generate all prefixes for a path. "/a/b/c" -> ["/a","/a/b","/a/b/c"] */
     public static List<String> prefixes(String path) {
         List<String> prefixes = new ArrayList<String>();
-        String prefix = "";
+        StringBuilder prefix = new StringBuilder();
         for (String comp : path.split("/+")) {
             // Skip the first (empty) path component.
             if (!comp.equals("")) {
-                prefix += "/" + comp;
-                prefixes.add(prefix);
+                prefix.append("/").append(comp);
+                prefixes.add(prefix.toString());
             }
         }
         return prefixes;

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java Thu May 17 16:32:05 2012
@@ -27,7 +27,7 @@ public class PubSubResponseUtils {
     /**
      * Change here if bumping up the version number that the server sends back
      */
-    protected static ProtocolVersion serverVersion = ProtocolVersion.VERSION_ONE;
+    protected final static ProtocolVersion serverVersion = ProtocolVersion.VERSION_ONE;
 
     static PubSubResponse.Builder getBasicBuilder(StatusCode status) {
         return PubSubResponse.newBuilder().setProtocolVersion(serverVersion).setStatusCode(status);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/pom.xml?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/pom.xml Thu May 17 16:32:05 2012
@@ -137,31 +137,13 @@
           </descriptors>
         </configuration>
       </plugin>
-<!--
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <version>1.2.1</version>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <artifactSet>
-                <excludes>
-                  <exclude>classworlds:classworlds</exclude>
-                  <exclude>junit:junit</exclude>
-                  <exclude>jmock:jmock</exclude>
-                  <exclude>xml-apis:xml-apis</exclude>
-                </excludes>
-              </artifactSet>
-            </configuration>
-          </execution>
-        </executions>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
+        </configuration>
       </plugin>
-      -->
       <plugin>
         <artifactId>maven-antrun-plugin</artifactId>
         <executions>

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java Thu May 17 16:32:05 2012
@@ -18,6 +18,7 @@
 
 package org.apache.hedwig.admin;
 
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -65,7 +66,7 @@ public class HedwigAdmin {
     protected ClientConfiguration bkClientConf;
 
     // Empty watcher
-    private class MyWatcher implements Watcher {
+    private static class MyWatcher implements Watcher {
         public void process(WatchedEvent event) {
         }
     }
@@ -140,7 +141,7 @@ public class HedwigAdmin {
      * @return bookeeper passwd
      */
     public byte[] getBkPasswd() {
-        return passwd;
+        return Arrays.copyOf(passwd, passwd.length);
     }
 
     /**
@@ -187,8 +188,10 @@ public class HedwigAdmin {
                 if (data != null) {
                     load = Integer.parseInt(new String(data));
                 }
-            } catch (Exception e) {
-                // igore
+            } catch (KeeperException ke) {
+                LOG.warn("Couldn't read hub data from ZooKeeper", ke);
+            } catch (InterruptedException ie) {
+                LOG.warn("Interrupted during read", ie);
             }
             hubs.put(new HedwigSocketAddress(host), load);
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java Thu May 17 16:32:05 2012
@@ -336,8 +336,6 @@ public final class HedwigCommands {
 
         public String getDescription() { return desc; }
 
-        public String[] getUsage() { return usage; }
-
         public Map<String, COMMAND> getSubCommands() { return subCmds; }
 
         public void addSubCommand(COMMAND c) {
@@ -364,7 +362,10 @@ public final class HedwigCommands {
         commands.put(c.getName(), c);
     }
 
-    static {
+    static synchronized void init() {
+        if (commands != null) {
+            return;
+        }
         commands = new LinkedHashMap<String, COMMAND>();
 
         addCommand(COMMAND.CMD_PUB);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Thu May 17 16:32:05 2012
@@ -34,7 +34,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.hedwig.admin.HedwigAdmin;
@@ -100,7 +101,7 @@ public class HedwigConsole {
         boolean runCmd(String[] args) throws Exception;
     }
 
-    class HelpCmd implements MyCommand {
+    static class HelpCmd implements MyCommand {
 
         @Override
         public boolean runCmd(String[] args) throws Exception {
@@ -127,7 +128,7 @@ public class HedwigConsole {
             printMessage("Quitting ...");
             hubClient.close();
             admin.close();
-            System.exit(0);
+            Runtime.getRuntime().exit(0);
             return true;
         }
     }
@@ -223,7 +224,7 @@ public class HedwigConsole {
         
     }
 
-    class ConsoleMessageHandler implements MessageHandler {
+    static class ConsoleMessageHandler implements MessageHandler {
 
         @Override
         public void deliver(ByteString topic, ByteString subscriberId,
@@ -444,7 +445,7 @@ public class HedwigConsole {
 
             boolean subscribed = false;
             boolean success = false;
-            final AtomicBoolean isDone = new AtomicBoolean(false);
+            final CountDownLatch isDone = new CountDownLatch(1);
             long elapsedTime = 0L;
 
             System.out.println("Starting PUBSUB test ...");
@@ -471,10 +472,7 @@ public class HedwigConsole {
                         if (thisTopic.equals(topic) && subscriberId.equals(subId) &&
                             msg.getBody().equals(message.getBody())) {
                             System.out.println("Received message : " + message.getBody().toStringUtf8());
-                            synchronized(isDone) {
-                                isDone.set(true);
-                                isDone.notify();
-                            }
+                            isDone.countDown();
                         }
                         callback.operationFinished(context, null);
                     }
@@ -482,10 +480,7 @@ public class HedwigConsole {
                 });
 
                 // wait for the message
-                synchronized (isDone) {
-                    isDone.wait(timeoutSecs * 1000L);
-                }
-                success = isDone.get();
+                success = isDone.await(timeoutSecs, TimeUnit.SECONDS);
                 elapsedTime = System.currentTimeMillis() - startTime;
             } finally {
                 try {
@@ -783,6 +778,7 @@ public class HedwigConsole {
      * @throws InterruptedException 
      */
     public HedwigConsole(String[] args) throws IOException, InterruptedException {
+        HedwigCommands.init();
         cl.parseOptions(args);
 
         if (cl.getCommand() == null) {
@@ -970,10 +966,26 @@ public class HedwigConsole {
 
                     historyEnabled = true;
                     System.out.println("JLine history support is enabled");
-                } catch (Exception e) {
+                } catch (ClassNotFoundException e) {
+                    System.out.println("JLine history support is disabled");
+                    LOG.debug("JLine history disabled with exception", e);
                     historyEnabled = false;
-                    e.printStackTrace();
+                } catch (NoSuchMethodException e) {
+                    System.out.println("JLine history support is disabled");
+                    LOG.debug("JLine history disabled with exception", e);
+                    historyEnabled = false;
+                } catch (InvocationTargetException e) {
                     System.out.println("JLine history support is disabled");
+                    LOG.debug("JLine history disabled with exception", e);
+                    historyEnabled = false;
+                } catch (IllegalAccessException e) {
+                    System.out.println("JLine history support is disabled");
+                    LOG.debug("JLine history disabled with exception", e);
+                    historyEnabled = false;
+                } catch (InstantiationException e) {
+                    System.out.println("JLine history support is disabled");
+                    LOG.debug("JLine history disabled with exception", e);
+                    historyEnabled = false;
                 }
 
                 String line;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java Thu May 17 16:32:05 2012
@@ -72,7 +72,6 @@ public class ReadTopic {
     static final int NUM_MESSAGES_TO_PRINT = 15;
 
     SortedMap<Long, InMemoryLedgerRange> ledgers = new TreeMap<Long, InMemoryLedgerRange>();
-    SubscriptionState leastSubscriber = null;
     
     static class InMemoryLedgerRange {
         LedgerRange range;
@@ -152,7 +151,6 @@ public class ReadTopic {
             long localMsgId = state.getMsgId().getLocalComponent();
             if (localMsgId < leastConsumedSeqId) {
                 leastConsumedSeqId = localMsgId;
-                this.leastSubscriber = state;
             }
         }
         if (leastConsumedSeqId == Long.MAX_VALUE) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java Thu May 17 16:32:05 2012
@@ -26,7 +26,7 @@ public class TerminateJVMExceptionHandle
     @Override
     public void uncaughtException(Thread t, Throwable e) {
         logger.error("Uncaught exception in thread " + t.getName(), e);
-        System.exit(1);
+        Runtime.getRuntime().exit(1);
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java Thu May 17 16:32:05 2012
@@ -78,4 +78,8 @@ public class ChannelEndPoint implements 
         }
     }
 
+    @Override
+    public int hashCode() {
+        return channel.hashCode();
+    }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Thu May 17 16:32:05 2012
@@ -22,6 +22,8 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.server.subscriptions.MessageFilter;
 
 public interface DeliveryManager {
+    public void start();
+
     public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
                                          DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu May 17 16:32:05 2012
@@ -89,15 +89,20 @@ public class FIFODeliveryManager impleme
     // Boolean indicating if this thread should continue running. This is used
     // when we want to stop the thread during a PubSubServer shutdown.
     protected boolean keepRunning = true;
+    private final Thread workerThread;
 
     public FIFODeliveryManager(PersistenceManager persistenceMgr, ServerConfiguration cfg) {
         this.persistenceMgr = persistenceMgr;
         perTopicDeliveryPtrs = new HashMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>>();
         subscriberStates = new HashMap<TopicSubscriber, ActiveSubscriberState>();
-        new Thread(this, "DeliveryManagerThread").start();
+        workerThread = new Thread(this, "DeliveryManagerThread");
         this.cfg = cfg;
     }
 
+    public void start() {
+        workerThread.start();
+    }
+
     /**
      * ===================================================================== Our
      * usual enqueue function, stop if error because of unbounded queue, should
@@ -296,7 +301,7 @@ public class FIFODeliveryManager impleme
         long localSeqIdDeliveringNow;
         long lastSeqIdCommunicatedExternally;
         // TODO make use of these variables
-        MessageFilter filter;
+
         boolean isHubSubscriber;
         final static int SEQ_ID_SLACK = 10;
 
@@ -306,7 +311,7 @@ public class FIFODeliveryManager impleme
             this.subscriberId = subscriberId;
             this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
             this.deliveryEndPoint = deliveryEndPoint;
-            this.filter = filter;
+
             this.isHubSubscriber = isHubSubscriber;
         }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java Thu May 17 16:32:05 2012
@@ -30,7 +30,7 @@ public class NettyHandlerBean implements
 
     public NettyHandlerBean(Map<OperationType, Handler> handlers) {
         this.handlers = handlers;
-        subHandler = (SubscribeHandler) handlers.get(OperationType.SUBSCRIBE);
+        subHandler = (SubscribeHandler) this.handlers.get(OperationType.SUBSCRIBE);
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Thu May 17 16:32:05 2012
@@ -107,6 +107,7 @@ public class PubSubServer {
     // JMX Beans
     NettyHandlerBean jmxNettyBean;
     PubSubServerBean jmxServerBean;
+    final ThreadGroup tg;
 
     protected PersistenceManager instantiatePersistenceManager(TopicManager topicMgr) throws IOException,
         InterruptedException {
@@ -311,8 +312,9 @@ public class PubSubServer {
      * @throws InterruptedException
      * @throws ConfigurationException
      */
-    public PubSubServer(final ServerConfiguration conf, final Thread.UncaughtExceptionHandler exceptionHandler)
-            throws Exception {
+    public PubSubServer(final ServerConfiguration conf,
+                        final Thread.UncaughtExceptionHandler exceptionHandler)
+            throws ConfigurationException {
 
         // First validate the conf
         this.conf = conf;
@@ -320,7 +322,7 @@ public class PubSubServer {
 
         // We need a custom thread group, so that we can override the uncaught
         // exception method
-        ThreadGroup tg = new ThreadGroup("hedwig") {
+        tg = new ThreadGroup("hedwig") {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
                 exceptionHandler.uncaughtException(t, e);
@@ -330,7 +332,9 @@ public class PubSubServer {
         // we do in ZK threads throws an exception, we want our handler to be
         // called, not theirs.
         SafeAsyncCallback.setUncaughtExceptionHandler(exceptionHandler);
+    }
 
+    public void start() throws Exception {
         final SynchronousQueue<Either<Object, Exception>> queue = new SynchronousQueue<Either<Object, Exception>>();
 
         new Thread(tg, new Runnable() {
@@ -349,6 +353,8 @@ public class PubSubServer {
                     tm = instantiateTopicManager();
                     pm = instantiatePersistenceManager(tm);
                     dm = new FIFODeliveryManager(pm, conf);
+                    dm.start();
+
                     sm = instantiateSubscriptionManager(tm, pm);
                     rm = instantiateRegionManager(pm, scheduler);
                     sm.addListener(rm);
@@ -422,7 +428,7 @@ public class PubSubServer {
             logger.info("Using configuration file " + confFile);
         }
         try {
-            new PubSubServer(conf);
+            new PubSubServer(conf).start();
         } catch (Throwable t) {
             errorMsgAndExit("Error during startup", t, RC_OTHER);
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java Thu May 17 16:32:05 2012
@@ -121,7 +121,7 @@ public class ServerStats {
             ++latencyBuckets[bucket];
         }
 
-        public OpStatData toOpStatData() {
+        synchronized public OpStatData toOpStatData() {
             double avgLatency = numSuccessOps > 0 ? totalLatency / numSuccessOps : 0.0f;
             StringBuilder sb = new StringBuilder();
             for (int i=0; i<NUM_BUCKETS; i++) {
@@ -131,7 +131,8 @@ public class ServerStats {
                 }
             }
 
-            return new OpStatData(maxLatency, minLatency, avgLatency, numSuccessOps, numFailedOps, sb.toString());
+            return new OpStatData(maxLatency, minLatency, avgLatency,
+                                  numSuccessOps, numFailedOps, sb.toString());
         }
 
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java Thu May 17 16:32:05 2012
@@ -28,6 +28,10 @@ import java.sql.Statement;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
 import javax.sql.rowset.serial.SerialBlob;
 
 import org.slf4j.Logger;
@@ -74,6 +78,18 @@ public class LocalDBPersistenceManager i
             }
         }
     };
+
+    private static final ThreadLocal<MessageDigest> threadLocalDigest = new ThreadLocal<MessageDigest>() {
+        @Override
+        protected MessageDigest initialValue() {
+            try {
+                return MessageDigest.getInstance("MD5");
+            } catch (NoSuchAlgorithmException e) {
+                logger.error("Could not find MD5 hash", e);
+                return null;
+            }
+        }
+    };
     static final String ID_FIELD_NAME = "id";
     static final String MSG_FIELD_NAME = "msg";
     static final String driver = "org.apache.derby.jdbc.EmbeddedDriver";
@@ -246,14 +262,22 @@ public class LocalDBPersistenceManager i
      * sneak in and create the table before us
      */
     private void createTable(Connection conn, ByteString topic) {
-
+        Statement stmt = null;
         try {
-            Statement stmt = conn.createStatement();
+            stmt = conn.createStatement();
             String tableName = getTableNameForTopic(topic);
             stmt.execute("CREATE TABLE " + tableName + " (" + ID_FIELD_NAME + " BIGINT NOT NULL CONSTRAINT ID_PK_"
-                         + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)");
+                    + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)");
         } catch (SQLException e) {
             logger.debug("Could not create table", e);
+        } finally {
+            try {
+                if (stmt != null) {
+                    stmt.close();
+                }
+            } catch (SQLException e) {
+                logger.error("Error closing statement", e);
+            }
         }
     }
 
@@ -274,7 +298,11 @@ public class LocalDBPersistenceManager i
     }
 
     private String getTableNameForTopic(ByteString topic) {
-        return (topic.toStringUtf8() + "_" + version);
+        String src = (topic.toStringUtf8() + "_" + version);
+        threadLocalDigest.get().reset();
+        byte[] digest = threadLocalDigest.get().digest(src.getBytes());
+        BigInteger bigInt = new BigInteger(1,digest);
+        return String.format("TABLE_%032X", bigInt);
     }
 
     private void scanMessagesInternal(ByteString topic, long startSeqId, int messageLimit, long sizeLimit,
@@ -290,7 +318,7 @@ public class LocalDBPersistenceManager i
         long currentSeqId;
         currentSeqId = startSeqId;
 
-        PreparedStatement stmt;
+        PreparedStatement stmt = null;
         try {
             try {
                 stmt = conn.prepareStatement("SELECT * FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
@@ -367,8 +395,15 @@ public class LocalDBPersistenceManager i
             logger.error("Message stored in derby is not parseable", e);
             callback.scanFailed(ctx, new ServiceDownException(e));
             return;
+        } finally {
+            try {
+                if (stmt != null) {
+                    stmt.close();
+                }
+            } catch (SQLException e) {
+                logger.error("Error closing statement", e);
+            }
         }
-
     }
 
     public void deliveredUntil(ByteString topic, Long seqId) {
@@ -381,20 +416,27 @@ public class LocalDBPersistenceManager i
             logger.error("Not connected to derby");
             return;
         }
-        PreparedStatement stmt;
+        PreparedStatement stmt = null;
         try {
             stmt = conn.prepareStatement("DELETE FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
                                          + " <= ?");
             stmt.setLong(1, seqId);
             int rowCount = stmt.executeUpdate();
             logger.debug("Deleted " + rowCount + " records for topic: " + topic.toStringUtf8() + ", seqId: " + seqId);
-            stmt.close();
         } catch (SQLException sqle) {
             String theError = (sqle).getSQLState();
             if (theError.equals("42X05")) {
                 logger.warn("Table for topic (" + topic + ") does not exist so no consumed messages to delete!");
             } else
                 logger.error("Error while executing derby delete for consumed messages", sqle);
+        } finally {
+            try {
+                if (stmt != null) {
+                    stmt.close();
+                }
+            } catch (SQLException e) {
+                logger.error("Error closing statement", e);
+            }
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Thu May 17 16:32:05 2012
@@ -458,7 +458,7 @@ public class ReadAheadCache implements P
     }
 
     protected static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> {
-        protected static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();
+        protected final static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();
 
         public Set<CacheKey> newInstance() {
             return new HashSet<CacheKey>();
@@ -466,7 +466,7 @@ public class ReadAheadCache implements P
     }
 
     protected static class TreeSetLongFactory implements Factory<SortedSet<Long>> {
-        protected static TreeSetLongFactory instance = new TreeSetLongFactory();
+        protected final static TreeSetLongFactory instance = new TreeSetLongFactory();
 
         public SortedSet<Long> newInstance() {
             return new TreeSet<Long>();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Thu May 17 16:32:05 2012
@@ -53,18 +53,24 @@ public class HedwigProxy {
     Map<OperationType, Handler> handlers;
     ProxyConfiguration cfg;
     ChannelTracker tracker;
+    ThreadGroup tg;
 
-    public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler)
-            throws InterruptedException {
+    public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler) {
         this.cfg = cfg;
 
-        ThreadGroup tg = new ThreadGroup("hedwigproxy") {
+        tg = new ThreadGroup("hedwigproxy") {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
                 exceptionHandler.uncaughtException(t, e);
             }
         };
+    }
+
+    public HedwigProxy(ProxyConfiguration conf) throws InterruptedException {
+        this(conf, new TerminateJVMExceptionHandler());
+    }
 
+    public void start() throws InterruptedException {
         final LinkedBlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
 
         new Thread(tg, new Runnable() {
@@ -84,10 +90,6 @@ public class HedwigProxy {
         queue.take();
     }
 
-    public HedwigProxy(ProxyConfiguration conf) throws InterruptedException {
-        this(conf, new TerminateJVMExceptionHandler());
-    }
-
     // used for testing
     public ChannelTracker getChannelTracker() {
         return tracker;
@@ -161,7 +163,7 @@ public class HedwigProxy {
             logger.info("Using configuration file " + confFile);
         }
         try {
-            new HedwigProxy(conf);
+            new HedwigProxy(conf).start();
         } catch (Throwable t) {
             PubSubServer.errorMsgAndExit("Error during startup", t, PubSubServer.RC_OTHER);
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java Thu May 17 16:32:05 2012
@@ -21,8 +21,8 @@ import org.apache.hedwig.client.conf.Cli
 
 public class ProxyConfiguration extends ClientConfiguration {
 
-    protected static String PROXY_PORT = "proxy_port";
-    protected static String MAX_MESSAGE_SIZE = "max_message_size";
+    protected final static String PROXY_PORT = "proxy_port";
+    protected final static String MAX_MESSAGE_SIZE = "max_message_size";
 
     public int getProxyPort() {
         return conf.getInt(PROXY_PORT, 9099);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Thu May 17 16:32:05 2012
@@ -26,7 +26,7 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,7 +93,7 @@ public class RegionManager implements Su
                 if (null == topics || topics.isEmpty()) {
                     continue;
                 }
-                final AtomicBoolean done = new AtomicBoolean(false);
+                final CountDownLatch done = new CountDownLatch(1);
                 Callback<Void> postCb = new Callback<Void>() {
                     @Override
                     public void operationFinished(Object ctx,
@@ -106,10 +106,7 @@ public class RegionManager implements Su
                         finish();
                     }
                     void finish() {
-                        synchronized (done) {
-                            done.set(true);
-                            done.notifyAll();
-                        }
+                        done.countDown();
                     }
                 };
                 Callback<Void> mcb = CallbackUtils.multiCallback(topics.size(), postCb, null);
@@ -122,14 +119,10 @@ public class RegionManager implements Su
                     }
                     retrySubscribe(client, topic, mcb);
                 }
-                synchronized (done) {
-                    if (done.get()) {
-                        try {
-                            done.wait();
-                        } catch (InterruptedException e) {
-                            LOGGER.warn("Exception during retrying remote subscriptions : ", e);
-                        }
-                    }
+                try {
+                    done.await();
+                } catch (InterruptedException e) {
+                    LOGGER.warn("Exception during retrying remote subscriptions : ", e);
                 }
             }
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Thu May 17 16:32:05 2012
@@ -71,7 +71,7 @@ public abstract class AbstractSubscripti
 
     Callback<Void> noopCallback = new NoopCallback<Void>();
 
-    class NoopCallback<T> implements Callback<T> {
+    static class NoopCallback<T> implements Callback<T> {
         @Override
         public void operationFailed(Object ctx, PubSubException exception) {
             logger.warn("Exception found in AbstractSubscriptionManager : ", exception);
@@ -110,6 +110,10 @@ public abstract class AbstractSubscripti
             // so it should be safe to run this fairly often.
             for (ByteString topic : top2sub2seq.keySet()) {
                 final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+                if (topicSubscriptions == null) {
+                    continue;
+                }
+
                 long minConsumedMessage = Long.MAX_VALUE;
                 boolean hasBound = true;
                 // Loop through all subscribers to the current topic to find the
@@ -126,9 +130,9 @@ public abstract class AbstractSubscripti
                 // Don't call the PersistenceManager if nobody is subscribed to
                 // the topic yet, or the consume pointer has not changed since
                 // the last time, or if this is the initial subscription.
+                Long minConsumedFromMap = topic2MinConsumedMessagesMap.get(topic);
                 if (topicSubscriptions.isEmpty()
-                    || (topic2MinConsumedMessagesMap.containsKey(topic)
-                        && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage)
+                    || (minConsumedFromMap != null && minConsumedFromMap.equals(minConsumedMessage))
                     || minConsumedMessage == 0) {
                     topic2MinConsumedMessagesMap.put(topic, minConsumedMessage);
                     pm.consumedUntil(topic, minConsumedMessage);
@@ -404,7 +408,8 @@ public abstract class AbstractSubscripti
                                         // so the following codes only happened when remote subscription failed.
                                         // it is safe to decrement the local count so next subscribe op
                                         // could have the chance to subscribe remote.
-                                        topic2LocalCounts.get(topic).decrementAndGet();
+                                        AtomicInteger count = topic2LocalCounts.get(topic);
+                                        if (count != null) { count.decrementAndGet(); }
                                     }
                                     cb.operationFailed(ctx, exception);
                                 }
@@ -422,8 +427,10 @@ public abstract class AbstractSubscripti
 
                     };
 
+                    AtomicInteger count = topic2LocalCounts.get(topic);
                     if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())
-                            && topic2LocalCounts.get(topic).incrementAndGet() == 1)
+                        && count != null
+                        && count.incrementAndGet() == 1)
                         notifySubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
                     else
                         cb2.operationFinished(ctx, resultOfOperation);
@@ -537,8 +544,9 @@ public abstract class AbstractSubscripti
                 public void operationFinished(Object ctx, Void resultOfOperation) {
                     topicSubscriptions.remove(subscriberId);
                     // Notify listeners if necessary.
+                    AtomicInteger count = topic2LocalCounts.get(topic);
                     if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
-                            && topic2LocalCounts.get(topic).decrementAndGet() == 0)
+                        && count != null && count.decrementAndGet() == 0)
                         notifyUnsubcribe(topic);
 
                     updateMessageBound(topic);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java Thu May 17 16:32:05 2012
@@ -20,7 +20,7 @@ package org.apache.hedwig.server.subscri
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 
 public class TrueFilter implements MessageFilter {
-    protected static TrueFilter instance = new TrueFilter();
+    protected final static TrueFilter instance = new TrueFilter();
 
     public static TrueFilter instance() {
         return instance;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Thu May 17 16:32:05 2012
@@ -103,7 +103,7 @@ public class ZkTopicManager extends Abst
                 // Check for expired connection.
                 if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
                     logger.error("ZK client connection to the ZK server has expired!");
-                    System.exit(1);
+                    Runtime.getRuntime().exit(1);
                 }
             }
         });

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java Thu May 17 16:32:05 2012
@@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptio
 import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
 
 public class SafeAsyncCallback {
-    protected static UncaughtExceptionHandler uncaughtExceptionHandler = new TerminateJVMExceptionHandler();
+    static UncaughtExceptionHandler uncaughtExceptionHandler = new TerminateJVMExceptionHandler();
 
     public static void setUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
         SafeAsyncCallback.uncaughtExceptionHandler = uncaughtExceptionHandler;

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml?rev=1339691&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml Thu May 17 16:32:05 2012
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+//-->
+<FindBugsFilter>
+  <Match>
+    <Class name="org.apache.hedwig.server.persistence.LocalDBPersistenceManager" />
+    <Method name="createTable" />
+    <!-- We make is safe by hashing the input before using //-->
+    <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" />
+  </Match>
+</FindBugsFilter>

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java Thu May 17 16:32:05 2012
@@ -118,7 +118,10 @@ public abstract class HedwigHubTestBase 
         // Now create the PubSubServer Hubs
         serversList = new LinkedList<PubSubServer>();
         for (int i = 0; i < numServers; i++) {
-            serversList.add(new PubSubServer(getServerConfiguration(initialServerPort + i, initialSSLServerPort + i)));
+            PubSubServer s = new PubSubServer(
+                    getServerConfiguration(initialServerPort + i, initialSSLServerPort + i));
+            serversList.add(s);
+            s.start();
         }
     }
     protected void stopHubServers() throws Exception {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java Thu May 17 16:32:05 2012
@@ -232,9 +232,13 @@ public abstract class HedwigRegionTestBa
         // servers. We will basically increment through the port numbers
         // starting from the initial ones defined.
         for (int j = 0; j < numServersPerRegion; j++) {
-            serversList.add(new PubSubServer(getServerConfiguration(initialServerPort
-                                             + (j + i * numServersPerRegion), initialSSLServerPort + (j + i * numServersPerRegion),
-                                             regionName)));
+            PubSubServer s = new PubSubServer(
+                    getServerConfiguration(initialServerPort
+                                           + (j + i * numServersPerRegion),
+                                           initialSSLServerPort + (j + i * numServersPerRegion),
+                                           regionName));
+            serversList.add(s);
+            s.start();
         }
         // Store this list of servers created for the current region
         regionServersMap.put(regionName, serversList);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java Thu May 17 16:32:05 2012
@@ -48,6 +48,7 @@ public abstract class PubSubServerStandA
     public void setUp() throws Exception {
         logger.info("STARTING " + getName());
         server = new PubSubServer(new StandAloneServerConfiguration());
+        server.start();
         logger.info("Standalone PubSubServer test setup finished");
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java Thu May 17 16:32:05 2012
@@ -96,6 +96,7 @@ public class TestPubSubServerStartup {
         try {
             logger.info("starting hedwig broker!");
             hedwigServer = new PubSubServer(serverConf);
+            hedwigServer.start();
         } catch (Exception e) {
             e.printStackTrace();
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Thu May 17 16:32:05 2012
@@ -64,6 +64,10 @@ public class StubDeliveryManager impleme
     }
 
     @Override
+    public void start() {
+    }
+
+    @Override
     public void stop() {
         // do nothing now
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Thu May 17 16:32:05 2012
@@ -212,6 +212,7 @@ public class TestHedwigHub extends Hedwi
         super.setUp();
         if (mode == Mode.PROXY) {
             proxy = new HedwigProxy(proxyConf);
+            proxy.start();
         }
         client = new HedwigClient(getClientConfiguration());
         publisher = client.getPublisher();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java Thu May 17 16:32:05 2012
@@ -55,6 +55,7 @@ public class TestPubSubServer extends Pu
                 return super.getServerPort() + 1;
             }
         });
+        server1.start();
         server1.shutdown();
     }
 
@@ -91,7 +92,7 @@ public class TestPubSubServer extends Pu
                 return instantiator.instantiateTopicManager();
             }
         };
-
+        server.start();
         return server;
 
     }