You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/22 13:04:48 UTC
svn commit: r1400836 [2/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/s...
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java?rev=1400836&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java Mon Oct 22 11:04:47 2012
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * TODO: it is a temp test for close subscription request. after
+ * multiplexing channel manager is implemented, remove this test.
+ */
+public class TestCloseSubscription extends HedwigHubTestBase {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ numServers = 1;
+ super.setUp();
+ }
+
+ @Test
+ public void testCloseSubscriptionRequest() throws Exception {
+ HedwigClientImpl client = new HedwigClientImpl(new ClientConfiguration());
+ Publisher pub = client.getPublisher();
+ Subscriber sub = client.getSubscriber();
+
+ ByteString topic = ByteString.copyFromUtf8("testCloseSubscriptionRequest");
+ ByteString subid = ByteString.copyFromUtf8("mysubid");
+ sub.subscribe(topic, subid, CreateOrAttach.CREATE);
+
+ final int X = 20;
+ final AtomicInteger expected = new AtomicInteger(1);
+ final CountDownLatch firstLatch = new CountDownLatch(1);
+ final CountDownLatch secondLatch = new CountDownLatch(1);
+ sub.startDelivery(topic, subid, new MessageHandler() {
+ @Override
+ public synchronized void deliver(ByteString topic, ByteString subscriberId,
+ Message msg,
+ Callback<Void> callback, Object context) {
+ try {
+ int value = Integer.valueOf(msg.getBody().toStringUtf8());
+ logger.debug("Received message {},", value);
+
+ if (value == expected.get()) {
+ expected.incrementAndGet();
+ } else {
+ // error condition
+ logger.error("Did not receive expected value, expected {}, got {}",
+ expected.get(), value);
+ expected.set(0);
+ firstLatch.countDown();
+ secondLatch.countDown();
+ }
+ if (expected.get() == X+1) {
+ firstLatch.countDown();
+ }
+ if (expected.get() > X+1) {
+ secondLatch.countDown();
+ }
+ callback.operationFinished(context, null);
+ } catch (Exception e) {
+ logger.error("Received bad message", e);
+ firstLatch.countDown();
+ secondLatch.countDown();
+ }
+ }
+ });
+
+ // first publish
+ for (int i=1; i<=X; i++) {
+ pub.publish(topic, Message.newBuilder().setBody(
+ ByteString.copyFromUtf8(String.valueOf(i))).build());
+ }
+
+ assertTrue("Timed out waiting for messages " + (X+1),
+ firstLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Should be expected messages with only " + (X+1), X+1, expected.get());
+
+ final CountDownLatch closeSubLatch = new CountDownLatch(1);
+ Callback<ResponseBody> closeCb = new Callback<ResponseBody>() {
+ @Override
+ public void operationFinished(Object ctx, ResponseBody respBody) {
+ closeSubLatch.countDown();
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ closeSubLatch.countDown();
+ }
+ };
+
+ PubSubData pubSubData = new PubSubData(topic, null, subid,
+ OperationType.CLOSESUBSCRIPTION,
+ SubscriptionOptions.newBuilder().build(),
+ closeCb, null);
+ client.getHChannelManager().submitOp(pubSubData);
+ closeSubLatch.await(10, TimeUnit.SECONDS);
+
+ // second publish
+ for (int i=X+1; i<=2*X; i++) {
+ pub.publish(topic, Message.newBuilder().setBody(
+ ByteString.copyFromUtf8(String.valueOf(i))).build());
+ }
+
+ assertFalse("Receive more messages than " + X,
+ secondLatch.await(3, TimeUnit.SECONDS));
+ assertEquals("Should be expected message with " + (X + 1),
+ X + 1, expected.get());
+
+ sub.stopDelivery(topic, subid);
+ sub.closeSubscription(topic, subid);
+
+ client.close();
+ }
+
+
+
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Mon Oct 22 11:04:47 2012
@@ -24,7 +24,9 @@ import com.google.protobuf.ByteString;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.util.Callback;
public class StubDeliveryManager implements DeliveryManager {
@@ -62,8 +64,11 @@ public class StubDeliveryManager impleme
}
@Override
- public void stopServingSubscriber(ByteString topic, ByteString subscriberId) {
+ public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event,
+ Callback<Void> cb, Object ctx) {
lastRequest.add(new TopicSubscriber(topic, subscriberId));
+ cb.operationFinished(ctx, null);
}
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Mon Oct 22 11:04:47 2012
@@ -86,7 +86,7 @@ public class TestSubUnsubHandler extends
pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType(
OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
- ush = new UnsubscribeHandler(tm, conf, sm, dm);
+ ush = new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr);
}
@Test