You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/22 13:04:48 UTC

svn commit: r1400836 [2/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/s...

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java?rev=1400836&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java Mon Oct 22 11:04:47 2012
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * TODO: it is a temp test for close subscription request. after
+ * multiplexing channel manager is implemented, remove this test.
+ */
+public class TestCloseSubscription extends HedwigHubTestBase {
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        numServers = 1;
+        super.setUp();
+    }
+
+    @Test
+    public void testCloseSubscriptionRequest() throws Exception {
+        HedwigClientImpl client = new HedwigClientImpl(new ClientConfiguration());
+        Publisher pub = client.getPublisher();
+        Subscriber sub = client.getSubscriber();
+
+        ByteString topic = ByteString.copyFromUtf8("testCloseSubscriptionRequest"); 
+        ByteString subid = ByteString.copyFromUtf8("mysubid");
+        sub.subscribe(topic, subid, CreateOrAttach.CREATE);
+
+        final int X = 20;
+        final AtomicInteger expected = new AtomicInteger(1);
+        final CountDownLatch firstLatch = new CountDownLatch(1);
+        final CountDownLatch secondLatch = new CountDownLatch(1);
+        sub.startDelivery(topic, subid, new MessageHandler() {
+            @Override
+            public synchronized void deliver(ByteString topic, ByteString subscriberId,
+                                             Message msg,
+                                             Callback<Void> callback, Object context) {
+                try {
+                    int value = Integer.valueOf(msg.getBody().toStringUtf8());
+                    logger.debug("Received message {},", value);
+
+                    if (value == expected.get()) {
+                        expected.incrementAndGet();
+                    } else {
+                        // error condition
+                        logger.error("Did not receive expected value, expected {}, got {}",
+                                     expected.get(), value);
+                        expected.set(0);
+                        firstLatch.countDown();
+                        secondLatch.countDown();
+                    }
+                    if (expected.get() == X+1) {
+                        firstLatch.countDown();
+                    }
+                    if (expected.get() > X+1) {
+                        secondLatch.countDown();
+                    }
+                    callback.operationFinished(context, null);
+                } catch (Exception e) {
+                    logger.error("Received bad message", e);
+                    firstLatch.countDown();
+                    secondLatch.countDown();
+                }
+            }
+        });
+
+        // first publish 
+        for (int i=1; i<=X; i++) {
+            pub.publish(topic, Message.newBuilder().setBody(
+                               ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+
+        assertTrue("Timed out waiting for messages " + (X+1),
+                   firstLatch.await(10, TimeUnit.SECONDS));
+        assertEquals("Should be expected messages with only " + (X+1), X+1, expected.get());
+
+        final CountDownLatch closeSubLatch = new CountDownLatch(1);
+        Callback<ResponseBody> closeCb = new Callback<ResponseBody>() {
+            @Override
+            public void operationFinished(Object ctx, ResponseBody respBody) {
+                closeSubLatch.countDown();
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                closeSubLatch.countDown();
+            }
+        };
+
+        PubSubData pubSubData = new PubSubData(topic, null, subid,
+                                               OperationType.CLOSESUBSCRIPTION,
+                                               SubscriptionOptions.newBuilder().build(),
+                                               closeCb, null);
+        client.getHChannelManager().submitOp(pubSubData);
+        closeSubLatch.await(10, TimeUnit.SECONDS);
+
+        // second publish
+        for (int i=X+1; i<=2*X; i++) {
+            pub.publish(topic, Message.newBuilder().setBody(
+                               ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+
+        assertFalse("Receive more messages than " + X,
+                    secondLatch.await(3, TimeUnit.SECONDS));
+        assertEquals("Should be expected message with " + (X + 1),
+                     X + 1, expected.get());
+
+        sub.stopDelivery(topic, subid);
+        sub.closeSubscription(topic, subid);
+
+        client.close();
+    }
+
+    
+
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Mon Oct 22 11:04:47 2012
@@ -24,7 +24,9 @@ import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.filter.ServerMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.util.Callback;
 
 public class StubDeliveryManager implements DeliveryManager {
 
@@ -62,8 +64,11 @@ public class StubDeliveryManager impleme
     }
 
     @Override
-    public void stopServingSubscriber(ByteString topic, ByteString subscriberId) {
+    public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
+                                      SubscriptionEvent event,
+                                      Callback<Void> cb, Object ctx) {
         lastRequest.add(new TopicSubscriber(topic, subscriberId));
+        cb.operationFinished(ctx, null);
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Mon Oct 22 11:04:47 2012
@@ -86,7 +86,7 @@ public class TestSubUnsubHandler extends
         pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType(
                                      OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
 
-        ush = new UnsubscribeHandler(tm, conf, sm, dm);
+        ush = new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr);
     }
 
     @Test