You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/14 18:16:18 UTC
svn commit: r1384836 [2/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/
hedwig-client/src/main/cpp/test/
hedwig-client/src/main/java/org/apache/hedwig/client/api/
hedwig-client/src/main/jav...
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Fri Sep 14 16:16:17 2012
@@ -148,11 +148,12 @@ public class PubSubServer {
return pm;
}
- protected SubscriptionManager instantiateSubscriptionManager(TopicManager tm, PersistenceManager pm) {
+ protected SubscriptionManager instantiateSubscriptionManager(TopicManager tm, PersistenceManager pm,
+ DeliveryManager dm) {
if (conf.isStandalone()) {
- return new InMemorySubscriptionManager(tm, pm, conf, scheduler);
+ return new InMemorySubscriptionManager(conf, tm, pm, dm, scheduler);
} else {
- return new MMSubscriptionManager(mm, tm, pm, conf, scheduler);
+ return new MMSubscriptionManager(conf, mm, tm, pm, dm, scheduler);
}
}
@@ -392,7 +393,7 @@ public class PubSubServer {
dm = new FIFODeliveryManager(pm, conf);
dm.start();
- sm = instantiateSubscriptionManager(tm, pm);
+ sm = instantiateSubscriptionManager(tm, pm, dm);
rm = instantiateRegionManager(pm, scheduler);
sm.addListener(rm);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -42,6 +42,7 @@ import org.apache.hedwig.protoextensions
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.server.delivery.DeliveryManager;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
@@ -59,6 +60,9 @@ public abstract class AbstractSubscripti
private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<SubscriptionEventListener>();
private final ConcurrentHashMap<ByteString, AtomicInteger> topic2LocalCounts = new ConcurrentHashMap<ByteString, AtomicInteger>();
+ // Handle to the DeliveryManager for the server so we can stop serving subscribers
+ // when losing topics
+ private final DeliveryManager dm;
// Handle to the PersistenceManager for the server so we can pass along the
// message consume pointers for each topic.
private final PersistenceManager pm;
@@ -83,12 +87,14 @@ public abstract class AbstractSubscripti
};
}
- public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm, PersistenceManager pm,
+ public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm,
+ PersistenceManager pm, DeliveryManager dm,
ScheduledExecutorService scheduler) {
this.cfg = cfg;
queuer = new TopicOpQueuer(scheduler);
tm.addTopicOwnershipChangeListener(this);
this.pm = pm;
+ this.dm = dm;
// Schedule the recurring MessagesConsumedTask only if a
// PersistenceManager is passed.
if (pm != null) {
@@ -238,8 +244,12 @@ public abstract class AbstractSubscripti
class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
- public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx) {
+ final boolean removeStates;
+
+ public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx,
+ boolean removeStates) {
queuer.super(topic, cb, ctx);
+ this.removeStates = removeStates;
}
@Override
@@ -247,6 +257,8 @@ public abstract class AbstractSubscripti
Callback<Void> finalCb = new Callback<Void>() {
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
+ logger.info("Finished update subscription states when losting topic "
+ + topic.toStringUtf8());
finish();
}
@@ -258,6 +270,28 @@ public abstract class AbstractSubscripti
}
private void finish() {
+ // tell delivery manager to stop delivery for subscriptions of this topic
+ final Map<ByteString, InMemorySubscriptionState> topicSubscriptions;
+ if (removeStates) {
+ topicSubscriptions = top2sub2seq.remove(topic);
+ } else {
+ topicSubscriptions = top2sub2seq.get(topic);
+ }
+ // no subscriptions now, it may be removed by other release ops
+ if (null != topicSubscriptions) {
+ for (ByteString subId : topicSubscriptions.keySet()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stop serving subscriber (" + topic.toStringUtf8() + ", "
+ + subId.toStringUtf8() + ") when losing topic");
+ }
+ if (null != dm) {
+ dm.stopServingSubscriber(topic, subId);
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stop serving topic " + topic.toStringUtf8());
+ }
topic2LocalCounts.remove(topic);
// Since we decrement local count when some of remote subscriptions failed,
// while we don't unsubscribe those succeed subscriptions. so we can't depends
@@ -269,18 +303,13 @@ public abstract class AbstractSubscripti
if (logger.isDebugEnabled()) {
logger.debug("Try to update subscription states when losing topic " + topic.toStringUtf8());
}
- updateSubscriptionStates(topic, finalCb, ctx, true);
+ updateSubscriptionStates(topic, finalCb, ctx);
}
}
- void updateSubscriptionStates(ByteString topic, Callback<Void> finalCb, Object ctx, boolean removeTopic) {
+ void updateSubscriptionStates(ByteString topic, Callback<Void> finalCb, Object ctx) {
// Try to update subscription states of a specified topic
- Map<ByteString, InMemorySubscriptionState> states;
- if (removeTopic) {
- states = top2sub2seq.remove(topic);
- } else {
- states = top2sub2seq.get(topic);
- }
+ Map<ByteString, InMemorySubscriptionState> states = top2sub2seq.get(topic);
if (null == states) {
finalCb.operationFinished(ctx, null);
} else {
@@ -301,7 +330,7 @@ public abstract class AbstractSubscripti
*/
@Override
public void lostTopic(ByteString topic) {
- queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
+ queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, true));
}
private void notifyUnsubcribe(ByteString topic) {
@@ -342,7 +371,7 @@ public abstract class AbstractSubscripti
String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+ " requested creating a subscription but it is already subscribed with state: "
+ SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
- logger.debug(msg);
+ logger.error(msg);
cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg));
return;
}
@@ -392,7 +421,7 @@ public abstract class AbstractSubscripti
if (createOrAttach.equals(CreateOrAttach.ATTACH)) {
String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+ " requested attaching to an existing subscription but it is not subscribed";
- logger.debug(msg);
+ logger.error(msg);
cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg));
return;
}
@@ -635,7 +664,7 @@ public abstract class AbstractSubscripti
ConcurrencyUtils.put(queue, false);
}
};
- updateSubscriptionStates(topic, finalCb, null, false);
+ updateSubscriptionStates(topic, finalCb, null);
queue.take();
}
} catch (InterruptedException ie) {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -24,14 +24,18 @@ import java.util.concurrent.ScheduledExe
import com.google.protobuf.ByteString;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
public class InMemorySubscriptionManager extends AbstractSubscriptionManager {
- public InMemorySubscriptionManager(TopicManager tm, PersistenceManager pm, ServerConfiguration conf, ScheduledExecutorService scheduler) {
- super(conf, tm, pm, scheduler);
+ public InMemorySubscriptionManager(ServerConfiguration conf,
+ TopicManager tm, PersistenceManager pm,
+ DeliveryManager dm,
+ ScheduledExecutorService scheduler) {
+ super(conf, tm, pm, dm, scheduler);
}
@Override
@@ -69,6 +73,10 @@ public class InMemorySubscriptionManager
@Override
public void lostTopic(ByteString topic) {
// Intentionally do nothing, so that we dont lose in-memory information
+ if (logger.isDebugEnabled()) {
+ logger.debug("InMemorySubscriptionManager is losing topic " + topic.toStringUtf8());
+ }
+ queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null, false));
}
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -40,11 +40,12 @@ public class MMSubscriptionManager exten
SubscriptionDataManager subManager;
- public MMSubscriptionManager(MetadataManagerFactory metaManagerFactory,
+ public MMSubscriptionManager(ServerConfiguration cfg,
+ MetadataManagerFactory metaManagerFactory,
TopicManager topicMgr, PersistenceManager pm,
- ServerConfiguration cfg,
+ DeliveryManager dm,
ScheduledExecutorService scheduler) {
- super(cfg, topicMgr, pm, scheduler);
+ super(cfg, topicMgr, pm, dm, scheduler);
this.subManager = metaManagerFactory.newSubscriptionDataManager();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Fri Sep 14 16:16:17 2012
@@ -40,20 +40,61 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.SubscriptionListener;
public class TestPubSubClient extends PubSubServerStandAloneTestBase {
+ private static final int RETENTION_SECS_VALUE = 10;
+
// Client side variables
protected HedwigClient client;
protected Publisher publisher;
protected Subscriber subscriber;
+ protected class RetentionServerConfiguration extends StandAloneServerConfiguration {
+ @Override
+ public boolean isStandalone() {
+ return true;
+ }
+
+ @Override
+ public int getRetentionSecs() {
+ return RETENTION_SECS_VALUE;
+ }
+ }
+
// SynchronousQueues to verify async calls
private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
+ private final SynchronousQueue<SubscriptionEvent> eventQueue =
+ new SynchronousQueue<SubscriptionEvent>();
+
+ class TestSubscriptionListener implements SubscriptionListener {
+ SynchronousQueue<SubscriptionEvent> eventQueue;
+ public TestSubscriptionListener() {
+ this.eventQueue = TestPubSubClient.this.eventQueue;
+ }
+ public TestSubscriptionListener(SynchronousQueue<SubscriptionEvent> queue) {
+ this.eventQueue = queue;
+ }
+ @Override
+ public void processEvent(final ByteString topic, final ByteString subscriberId,
+ final SubscriptionEvent event) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ logger.debug("Event {} received for subscription(topic:{}, subscriber:{})",
+ new Object[] { event, topic.toStringUtf8(), subscriberId.toStringUtf8() });
+ ConcurrencyUtils.put(TestSubscriptionListener.this.eventQueue, event);
+ }
+ }).start();
+ }
+ }
// Test implementation of Callback for async client actions.
class TestCallback implements Callback<Void> {
@@ -84,6 +125,17 @@ public class TestPubSubClient extends Pu
// Test implementation of subscriber's message handler.
class TestMessageHandler implements MessageHandler {
+
+ private final SynchronousQueue<Boolean> consumeQueue;
+
+ public TestMessageHandler() {
+ this.consumeQueue = TestPubSubClient.this.consumeQueue;
+ }
+
+ public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) {
+ this.consumeQueue = consumeQueue;
+ }
+
public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
Object context) {
new Thread(new Runnable() {
@@ -91,7 +143,7 @@ public class TestPubSubClient extends Pu
public void run() {
if (logger.isDebugEnabled())
logger.debug("Consume operation finished successfully!");
- ConcurrencyUtils.put(consumeQueue, true);
+ ConcurrencyUtils.put(TestMessageHandler.this.consumeQueue, true);
}
}).start();
callback.operationFinished(context, null);
@@ -362,4 +414,119 @@ public class TestPubSubClient extends Pu
assertTrue(true);
}
+ @Test
+ public void testSyncSubscribeWithListener() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeWithListener");
+ ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+ subscriber.addSubscriptionListener(new TestSubscriptionListener());
+ try {
+ SubscriptionOptions options =
+ SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+ .setEnableResubscribe(false).build();
+ subscriber.subscribe(topic, subscriberId, options);
+ } catch (PubSubException.ServiceDownException e) {
+ fail("Should not reach here!");
+ }
+ subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+ tearDownHubServer();
+ assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
+ }
+
+ @Test
+ public void testAsyncSubscribeWithListener() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("myAsyncSubscribeWithListener");
+ ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+ subscriber.addSubscriptionListener(new TestSubscriptionListener());
+ SubscriptionOptions options =
+ SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+ .setEnableResubscribe(false).build();
+ subscriber.asyncSubscribe(topic, subscriberId, options,
+ new TestCallback(), null);
+ assertTrue(queue.take());
+ subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+ tearDownHubServer();
+ assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
+ }
+
+ @Test
+ public void testSyncSubscribeForceAttach() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeForceAttach");
+ ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+ subscriber.addSubscriptionListener(new TestSubscriptionListener());
+ SubscriptionOptions options =
+ SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+ .setForceAttach(true).setEnableResubscribe(false).build();
+ try {
+ subscriber.subscribe(topic, subscriberId, options);
+ } catch (PubSubException.ServiceDownException e) {
+ fail("Should not reach here!");
+ }
+ subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+
+ // new a client
+ HedwigClient client2 = new HedwigClient(new ClientConfiguration());
+ Subscriber subscriber2 = client2.getSubscriber();
+ Publisher publisher2 = client2.getPublisher();
+ SynchronousQueue<SubscriptionEvent> eventQueue2 =
+ new SynchronousQueue<SubscriptionEvent>();
+ subscriber2.addSubscriptionListener(new TestSubscriptionListener(eventQueue2));
+ try {
+ subscriber2.subscribe(topic, subscriberId, options);
+ } catch (PubSubException.ServiceDownException e) {
+ fail("Should not reach here!");
+ }
+
+ SynchronousQueue<Boolean> consumeQueue2 = new SynchronousQueue<Boolean>();
+ subscriber2.startDelivery(topic, subscriberId, new TestMessageHandler(consumeQueue2));
+
+ assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
+ assertTrue(eventQueue2.isEmpty());
+
+ // Now publish some messages for the topic to be consumed by the
+ // subscriber.
+ publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
+ new TestCallback(), null);
+ assertTrue(queue.take());
+ assertTrue(consumeQueue2.take());
+ assertTrue(consumeQueue.isEmpty());
+
+ publisher2.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #2")).build(),
+ new TestCallback(), null);
+ assertTrue(queue.take());
+ assertTrue(consumeQueue2.take());
+ assertTrue(consumeQueue.isEmpty());
+
+ client2.close();
+ }
+
+ @Test
+ public void testSyncSubscribeWithListenerWhenReleasingTopic() throws Exception {
+ tearDownHubServer();
+ startHubServer(new RetentionServerConfiguration());
+ ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeWithListenerWhenReleasingTopic");
+ ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+ subscriber.addSubscriptionListener(new TestSubscriptionListener());
+ SubscriptionOptions options =
+ SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+ .setForceAttach(false).setEnableResubscribe(false).build();
+ try {
+ subscriber.subscribe(topic, subscriberId, options);
+ } catch (PubSubException.ServiceDownException e) {
+ fail("Should not reach here!");
+ }
+ subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+
+ publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
+ new TestCallback(), null);
+ assertTrue(queue.take());
+ assertTrue(consumeQueue.take());
+
+ Thread.sleep(RETENTION_SECS_VALUE * 2);
+ assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
+ }
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java Fri Sep 14 16:16:17 2012
@@ -41,23 +41,40 @@ public abstract class PubSubServerStandA
}
}
+ public ServerConfiguration getStandAloneServerConfiguration() {
+ return new StandAloneServerConfiguration();
+ }
+
protected PubSubServer server;
@Override
@Before
public void setUp() throws Exception {
logger.info("STARTING " + getName());
- server = new PubSubServer(new StandAloneServerConfiguration());
- server.start();
+ startHubServer();
logger.info("Standalone PubSubServer test setup finished");
}
+
@Override
@After
public void tearDown() throws Exception {
logger.info("tearDown starting");
- server.shutdown();
+ tearDownHubServer();
logger.info("FINISHED " + getName());
}
+ protected void startHubServer() throws Exception {
+ startHubServer(getStandAloneServerConfiguration());
+ }
+
+ protected void startHubServer(ServerConfiguration conf) throws Exception {
+ server = new PubSubServer(conf);
+ server.start();
+ }
+
+ protected void tearDownHubServer() throws Exception {
+ server.shutdown();
+ }
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Fri Sep 14 16:16:17 2012
@@ -74,7 +74,7 @@ public class TestSubUnsubHandler extends
TopicManager tm = new TrivialOwnAllTopicManager(conf, executor);
dm = new StubDeliveryManager();
PersistenceManager pm = LocalDBPersistenceManager.instance();
- sm = new StubSubscriptionManager(tm, pm, conf, executor);
+ sm = new StubSubscriptionManager(tm, pm, dm, conf, executor);
sh = new SubscribeHandler(tm, dm, pm, sm, conf);
channel = new WriteRecordingChannel();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -25,6 +25,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
@@ -36,8 +37,9 @@ public class StubSubscriptionManager ext
this.fail = fail;
}
- public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, ServerConfiguration conf, ScheduledExecutorService scheduler) {
- super(tm, pm, conf, scheduler);
+ public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, DeliveryManager dm,
+ ServerConfiguration conf, ScheduledExecutorService scheduler) {
+ super(conf, tm, pm, dm, scheduler);
}
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java?rev=1384836&r1=1384835&r2=1384836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java Fri Sep 14 16:16:17 2012
@@ -57,8 +57,8 @@ public class TestMMSubscriptionManager e
cfg = new ServerConfiguration();
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
mm = MetadataManagerFactory.newMetadataManagerFactory(cfg, zk);
- sm = new MMSubscriptionManager(mm, new TrivialOwnAllTopicManager(cfg, scheduler),
- LocalDBPersistenceManager.instance(), cfg, scheduler);
+ sm = new MMSubscriptionManager(cfg, mm, new TrivialOwnAllTopicManager(cfg, scheduler),
+ LocalDBPersistenceManager.instance(), null, scheduler);
subDataCallback = new Callback<SubscriptionData>() {
@Override
public void operationFailed(Object ctx, final PubSubException exception) {