You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/14 18:16:18 UTC

svn commit: r1384836 [2/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/test/ hedwig-client/src/main/java/org/apache/hedwig/client/api/ hedwig-client/src/main/jav...

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Fri Sep 14 16:16:17 2012
@@ -148,11 +148,12 @@ public class PubSubServer {
         return pm;
     }
 
-    protected SubscriptionManager instantiateSubscriptionManager(TopicManager tm, PersistenceManager pm) {
+    protected SubscriptionManager instantiateSubscriptionManager(TopicManager tm, PersistenceManager pm,
+                                                                 DeliveryManager dm) {
         if (conf.isStandalone()) {
-            return new InMemorySubscriptionManager(tm, pm, conf, scheduler);
+            return new InMemorySubscriptionManager(conf, tm, pm, dm, scheduler);
         } else {
-            return new MMSubscriptionManager(mm, tm, pm, conf, scheduler);
+            return new MMSubscriptionManager(conf, mm, tm, pm, dm, scheduler);
         }
 
     }
@@ -392,7 +393,7 @@ public class PubSubServer {
                     dm = new FIFODeliveryManager(pm, conf);
                     dm.start();
 
-                    sm = instantiateSubscriptionManager(tm, pm);
+                    sm = instantiateSubscriptionManager(tm, pm, dm);
                     rm = instantiateRegionManager(pm, scheduler);
                     sm.addListener(rm);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -42,6 +42,7 @@ import org.apache.hedwig.protoextensions
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.server.delivery.DeliveryManager;
 import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
@@ -59,6 +60,9 @@ public abstract class AbstractSubscripti
     private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<SubscriptionEventListener>();
     private final ConcurrentHashMap<ByteString, AtomicInteger> topic2LocalCounts = new ConcurrentHashMap<ByteString, AtomicInteger>();
 
+    // Handle to the DeliveryManager for the server so we can stop serving subscribers
+    // when losing topics
+    private final DeliveryManager dm;
     // Handle to the PersistenceManager for the server so we can pass along the
     // message consume pointers for each topic.
     private final PersistenceManager pm;
@@ -83,12 +87,14 @@ public abstract class AbstractSubscripti
         };
     }
 
-    public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm, PersistenceManager pm,
+    public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm,
+                                       PersistenceManager pm, DeliveryManager dm,
                                        ScheduledExecutorService scheduler) {
         this.cfg = cfg;
         queuer = new TopicOpQueuer(scheduler);
         tm.addTopicOwnershipChangeListener(this);
         this.pm = pm;
+        this.dm = dm;
         // Schedule the recurring MessagesConsumedTask only if a
         // PersistenceManager is passed.
         if (pm != null) {
@@ -238,8 +244,12 @@ public abstract class AbstractSubscripti
 
     class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
 
-        public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx) {
+        final boolean removeStates;
+
+        public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx,
+                         boolean removeStates) {
             queuer.super(topic, cb, ctx);
+            this.removeStates = removeStates;
         }
 
         @Override
@@ -247,6 +257,8 @@ public abstract class AbstractSubscripti
             Callback<Void> finalCb = new Callback<Void>() {
                 @Override
                 public void operationFinished(Object ctx, Void resultOfOperation) {
+                    logger.info("Finished update subscription states when losting topic "
+                              + topic.toStringUtf8());
                     finish();
                 }
 
@@ -258,6 +270,28 @@ public abstract class AbstractSubscripti
                 }
 
                 private void finish() {
+                    // tell delivery manager to stop delivery for subscriptions of this topic
+                    final Map<ByteString, InMemorySubscriptionState> topicSubscriptions;
+                    if (removeStates) {
+                        topicSubscriptions = top2sub2seq.remove(topic);
+                    } else {
+                        topicSubscriptions = top2sub2seq.get(topic);
+                    }
+                    // no subscriptions now, it may be removed by other release ops
+                    if (null != topicSubscriptions) {
+                        for (ByteString subId : topicSubscriptions.keySet()) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Stop serving subscriber (" + topic.toStringUtf8() + ", "
+                                           + subId.toStringUtf8() + ") when losing topic");
+                            }
+                            if (null != dm) {
+                                dm.stopServingSubscriber(topic, subId);
+                            }
+                        }
+                    }
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Stop serving topic " + topic.toStringUtf8());
+                    }
                     topic2LocalCounts.remove(topic);
                     // Since we decrement local count when some of remote subscriptions failed,
                     // while we don't unsubscribe those succeed subscriptions. so we can't depends
@@ -269,18 +303,13 @@ public abstract class AbstractSubscripti
             if (logger.isDebugEnabled()) {
                 logger.debug("Try to update subscription states when losing topic " + topic.toStringUtf8());
             }
-            updateSubscriptionStates(topic, finalCb, ctx, true);
+            updateSubscriptionStates(topic, finalCb, ctx);
         }
     }
 
-    void updateSubscriptionStates(ByteString topic, Callback<Void> finalCb, Object ctx, boolean removeTopic) {
+    void updateSubscriptionStates(ByteString topic, Callback<Void> finalCb, Object ctx) {
         // Try to update subscription states of a specified topic
-        Map<ByteString, InMemorySubscriptionState> states;
-        if (removeTopic) {
-            states = top2sub2seq.remove(topic);
-        } else {
-            states = top2sub2seq.get(topic);
-        }
+        Map<ByteString, InMemorySubscriptionState> states = top2sub2seq.get(topic);
         if (null == states) {
             finalCb.operationFinished(ctx, null);
         } else {
@@ -301,7 +330,7 @@ public abstract class AbstractSubscripti
      */
     @Override
     public void lostTopic(ByteString topic) {
-        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
+        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, true));
     }
 
     private void notifyUnsubcribe(ByteString topic) {
@@ -342,7 +371,7 @@ public abstract class AbstractSubscripti
                     String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
                                  + " requested creating a subscription but it is already subscribed with state: "
                                  + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
-                    logger.debug(msg);
+                    logger.error(msg);
                     cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg));
                     return;
                 }
@@ -392,7 +421,7 @@ public abstract class AbstractSubscripti
             if (createOrAttach.equals(CreateOrAttach.ATTACH)) {
                 String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
                              + " requested attaching to an existing subscription but it is not subscribed";
-                logger.debug(msg);
+                logger.error(msg);
                 cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg));
                 return;
             }
@@ -635,7 +664,7 @@ public abstract class AbstractSubscripti
                         ConcurrencyUtils.put(queue, false);
                     }
                 };
-                updateSubscriptionStates(topic, finalCb, null, false);
+                updateSubscriptionStates(topic, finalCb, null);
                 queue.take();
             }
         } catch (InterruptedException ie) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -24,14 +24,18 @@ import java.util.concurrent.ScheduledExe
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
 import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.util.Callback;
 
 public class InMemorySubscriptionManager extends AbstractSubscriptionManager {
 
-    public InMemorySubscriptionManager(TopicManager tm, PersistenceManager pm, ServerConfiguration conf, ScheduledExecutorService scheduler) {
-        super(conf, tm, pm, scheduler);
+    public InMemorySubscriptionManager(ServerConfiguration conf,
+                                       TopicManager tm, PersistenceManager pm,
+                                       DeliveryManager dm,
+                                       ScheduledExecutorService scheduler) {
+        super(conf, tm, pm, dm, scheduler);
     }
 
     @Override
@@ -69,6 +73,10 @@ public class InMemorySubscriptionManager
     @Override
     public void lostTopic(ByteString topic) {
         // Intentionally do nothing, so that we dont lose in-memory information
+        if (logger.isDebugEnabled()) {
+            logger.debug("InMemorySubscriptionManager is losing topic " + topic.toStringUtf8());
+        }
+        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, false));
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -40,11 +40,12 @@ public class MMSubscriptionManager exten
 
     SubscriptionDataManager subManager;
 
-    public MMSubscriptionManager(MetadataManagerFactory metaManagerFactory,
+    public MMSubscriptionManager(ServerConfiguration cfg,
+                                 MetadataManagerFactory metaManagerFactory,
                                  TopicManager topicMgr, PersistenceManager pm,
-                                 ServerConfiguration cfg,
+                                 DeliveryManager dm,
                                  ScheduledExecutorService scheduler) {
-        super(cfg, topicMgr, pm, scheduler);
+        super(cfg, topicMgr, pm, dm, scheduler);
         this.subManager = metaManagerFactory.newSubscriptionDataManager();
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Fri Sep 14 16:16:17 2012
@@ -40,20 +40,61 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
 import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.SubscriptionListener;
 
 public class TestPubSubClient extends PubSubServerStandAloneTestBase {
 
+    private static final int RETENTION_SECS_VALUE = 10;
+
     // Client side variables
     protected HedwigClient client;
     protected Publisher publisher;
     protected Subscriber subscriber;
 
+    protected class RetentionServerConfiguration extends StandAloneServerConfiguration {
+        @Override
+        public boolean isStandalone() {
+            return true;
+        }
+
+        @Override
+        public int getRetentionSecs() {
+            return RETENTION_SECS_VALUE;
+        }
+    }
+
     // SynchronousQueues to verify async calls
     private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
     private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
+    private final SynchronousQueue<SubscriptionEvent> eventQueue =
+        new SynchronousQueue<SubscriptionEvent>();
+
+    class TestSubscriptionListener implements SubscriptionListener {
+        SynchronousQueue<SubscriptionEvent> eventQueue;
+        public TestSubscriptionListener() {
+            this.eventQueue = TestPubSubClient.this.eventQueue;
+        }
+        public TestSubscriptionListener(SynchronousQueue<SubscriptionEvent> queue) {
+            this.eventQueue = queue;
+        }
+        @Override
+        public void processEvent(final ByteString topic, final ByteString subscriberId,
+                                 final SubscriptionEvent event) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    logger.debug("Event {} received for subscription(topic:{}, subscriber:{})",
+                                 new Object[] { event, topic.toStringUtf8(), subscriberId.toStringUtf8() });
+                    ConcurrencyUtils.put(TestSubscriptionListener.this.eventQueue, event);
+                }
+            }).start();
+        }
+    }
 
     // Test implementation of Callback for async client actions.
     class TestCallback implements Callback<Void> {
@@ -84,6 +125,17 @@ public class TestPubSubClient extends Pu
 
     // Test implementation of subscriber's message handler.
     class TestMessageHandler implements MessageHandler {
+
+        private final SynchronousQueue<Boolean> consumeQueue;
+
+        public TestMessageHandler() {
+            this.consumeQueue = TestPubSubClient.this.consumeQueue;
+        }
+
+        public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) {
+            this.consumeQueue = consumeQueue;
+        }
+
         public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
                             Object context) {
             new Thread(new Runnable() {
@@ -91,7 +143,7 @@ public class TestPubSubClient extends Pu
                 public void run() {
                     if (logger.isDebugEnabled())
                         logger.debug("Consume operation finished successfully!");
-                    ConcurrencyUtils.put(consumeQueue, true);
+                    ConcurrencyUtils.put(TestMessageHandler.this.consumeQueue, true);
                 }
             }).start();
             callback.operationFinished(context, null);
@@ -362,4 +414,119 @@ public class TestPubSubClient extends Pu
         assertTrue(true);
     }
 
+    @Test
+    public void testSyncSubscribeWithListener() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeWithListener");
+        ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+        subscriber.addSubscriptionListener(new TestSubscriptionListener());
+        try {
+            SubscriptionOptions options =
+                SubscriptionOptions.newBuilder()
+                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+                .setEnableResubscribe(false).build();
+            subscriber.subscribe(topic, subscriberId, options);
+        } catch (PubSubException.ServiceDownException e) {
+            fail("Should not reach here!");
+        }
+        subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+        tearDownHubServer();
+        assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
+    }
+
+    @Test
+    public void testAsyncSubscribeWithListener() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("myAsyncSubscribeWithListener");
+        ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+        subscriber.addSubscriptionListener(new TestSubscriptionListener());
+        SubscriptionOptions options =
+            SubscriptionOptions.newBuilder()
+            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+            .setEnableResubscribe(false).build();
+        subscriber.asyncSubscribe(topic, subscriberId, options,
+                                  new TestCallback(), null);
+        assertTrue(queue.take());
+        subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+        tearDownHubServer();
+        assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
+    }
+
+    @Test
+    public void testSyncSubscribeForceAttach() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeForceAttach");
+        ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+        subscriber.addSubscriptionListener(new TestSubscriptionListener());
+        SubscriptionOptions options =
+            SubscriptionOptions.newBuilder()
+            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+            .setForceAttach(true).setEnableResubscribe(false).build();
+        try {
+            subscriber.subscribe(topic, subscriberId, options);
+        } catch (PubSubException.ServiceDownException e) {
+            fail("Should not reach here!");
+        }
+        subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+
+        // new a client
+        HedwigClient client2 = new HedwigClient(new ClientConfiguration());
+        Subscriber subscriber2 = client2.getSubscriber();
+        Publisher publisher2 = client2.getPublisher();
+        SynchronousQueue<SubscriptionEvent> eventQueue2 =
+            new SynchronousQueue<SubscriptionEvent>();
+        subscriber2.addSubscriptionListener(new TestSubscriptionListener(eventQueue2));
+        try {
+            subscriber2.subscribe(topic, subscriberId, options);
+        } catch (PubSubException.ServiceDownException e) {
+            fail("Should not reach here!");
+        }
+
+        SynchronousQueue<Boolean> consumeQueue2 = new SynchronousQueue<Boolean>();
+        subscriber2.startDelivery(topic, subscriberId, new TestMessageHandler(consumeQueue2));
+
+        assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
+        assertTrue(eventQueue2.isEmpty());
+
+        // Now publish some messages for the topic to be consumed by the
+        // subscriber.
+        publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
+                               new TestCallback(), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue2.take());
+        assertTrue(consumeQueue.isEmpty());
+
+        publisher2.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #2")).build(),
+                               new TestCallback(), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue2.take());
+        assertTrue(consumeQueue.isEmpty());
+
+        client2.close();
+    }
+
+    @Test
+    public void testSyncSubscribeWithListenerWhenReleasingTopic() throws Exception {
+        tearDownHubServer();
+        startHubServer(new RetentionServerConfiguration());
+        ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeWithListenerWhenReleasingTopic");
+        ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+        subscriber.addSubscriptionListener(new TestSubscriptionListener());
+        SubscriptionOptions options =
+            SubscriptionOptions.newBuilder()
+            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+            .setForceAttach(false).setEnableResubscribe(false).build();
+        try {
+            subscriber.subscribe(topic, subscriberId, options);
+        } catch (PubSubException.ServiceDownException e) {
+            fail("Should not reach here!");
+        }
+        subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+
+        publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
+                               new TestCallback(), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+
+        Thread.sleep(RETENTION_SECS_VALUE * 2);
+        assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
+    }
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java Fri Sep 14 16:16:17 2012
@@ -41,23 +41,40 @@ public abstract class PubSubServerStandA
         }
     }
 
+    public ServerConfiguration getStandAloneServerConfiguration() {
+        return new StandAloneServerConfiguration();
+    }
+
     protected PubSubServer server;
 
     @Override
     @Before
     public void setUp() throws Exception {
         logger.info("STARTING " + getName());
-        server = new PubSubServer(new StandAloneServerConfiguration());
-        server.start();
+        startHubServer();
         logger.info("Standalone PubSubServer test setup finished");
     }
 
+
     @Override
     @After
     public void tearDown() throws Exception {
         logger.info("tearDown starting");
-        server.shutdown();
+        tearDownHubServer();
         logger.info("FINISHED " + getName());
     }
 
+    protected void startHubServer() throws Exception {
+        startHubServer(getStandAloneServerConfiguration());
+    }
+
+    protected void startHubServer(ServerConfiguration conf) throws Exception {
+        server = new PubSubServer(conf);
+        server.start();
+    }
+
+    protected void tearDownHubServer() throws Exception {
+        server.shutdown();
+    }
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Fri Sep 14 16:16:17 2012
@@ -74,7 +74,7 @@ public class TestSubUnsubHandler extends
         TopicManager tm = new TrivialOwnAllTopicManager(conf, executor);
         dm = new StubDeliveryManager();
         PersistenceManager pm = LocalDBPersistenceManager.instance();
-        sm = new StubSubscriptionManager(tm, pm, conf, executor);
+        sm = new StubSubscriptionManager(tm, pm, dm, conf, executor);
         sh = new SubscribeHandler(tm, dm, pm, sm, conf);
         channel = new WriteRecordingChannel();
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -25,6 +25,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
 import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.util.Callback;
@@ -36,8 +37,9 @@ public class StubSubscriptionManager ext
         this.fail = fail;
     }
 
-    public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, ServerConfiguration conf, ScheduledExecutorService scheduler) {
-        super(tm, pm, conf, scheduler);
+    public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, DeliveryManager dm,
+                                   ServerConfiguration conf, ScheduledExecutorService scheduler) {
+        super(conf, tm, pm, dm, scheduler);
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -57,8 +57,8 @@ public class TestMMSubscriptionManager e
         cfg = new ServerConfiguration();
         final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
         mm = MetadataManagerFactory.newMetadataManagerFactory(cfg, zk);
-        sm = new MMSubscriptionManager(mm, new TrivialOwnAllTopicManager(cfg, scheduler),
-                                       LocalDBPersistenceManager.instance(), cfg, scheduler);
+        sm = new MMSubscriptionManager(cfg, mm, new TrivialOwnAllTopicManager(cfg, scheduler),
+                                       LocalDBPersistenceManager.instance(), null, scheduler);
         subDataCallback = new Callback<SubscriptionData>() {
             @Override
             public void operationFailed(Object ctx, final PubSubException exception) {