You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/03 17:14:14 UTC

svn commit: r1380268 [2/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/api/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-...

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Mon Sep  3 15:14:13 2012
@@ -22,7 +22,7 @@ import java.util.Queue;
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.ServerMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 
 public class StubDeliveryManager implements DeliveryManager {
@@ -32,10 +32,10 @@ public class StubDeliveryManager impleme
         public ByteString subscriberId;
         public MessageSeqId seqIdToStartFrom;
         public DeliveryEndPoint endPoint;
-        public MessageFilter filter;
+        public ServerMessageFilter filter;
 
         public StartServingRequest(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                                   DeliveryEndPoint endPoint, MessageFilter filter) {
+                                   DeliveryEndPoint endPoint, ServerMessageFilter filter) {
             this.topic = topic;
             this.subscriberId = subscriberId;
             this.seqIdToStartFrom = seqIdToStartFrom;
@@ -49,7 +49,7 @@ public class StubDeliveryManager impleme
 
     @Override
     public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint, MessageFilter filter) {
+                                         DeliveryEndPoint endPoint, ServerMessageFilter filter) {
         lastRequest.add(new StartServingRequest(topic, subscriberId, seqIdToStartFrom, endPoint, filter));
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java Mon Sep  3 15:14:13 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.hedwig.client.HedwigClient;
@@ -47,7 +48,9 @@ import org.apache.hedwig.client.api.Clie
 import org.apache.hedwig.client.api.Subscriber;
 import org.apache.hedwig.client.api.Publisher;
 import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.filter.MessageFilterBase;
+import org.apache.hedwig.filter.ServerMessageFilter;
 import org.apache.hedwig.util.Callback;
 
 import org.apache.hedwig.server.HedwigHubTestBase;
@@ -55,18 +58,19 @@ import org.apache.hedwig.server.HedwigHu
 public class TestMessageFilter extends HedwigHubTestBase {
 
     // Client side variables
+    protected ClientConfiguration conf;
     protected HedwigClient client;
     protected Publisher publisher;
     protected Subscriber subscriber;
 
     static final String OPT_MOD = "MOD";
 
-    static class ModMessageFilter implements MessageFilter {
+    static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter {
 
         int mod;
 
         @Override
-        public MessageFilter initialize(Configuration conf) {
+        public ServerMessageFilter initialize(Configuration conf) {
             // do nothing
             return this;
         }
@@ -77,8 +81,9 @@ public class TestMessageFilter extends H
         }
 
         @Override
-        public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
-                                                        SubscriptionPreferences preferences) {
+        public MessageFilterBase setSubscriptionPreferences(ByteString topic,
+                                                            ByteString subscriberId,
+                                                            SubscriptionPreferences preferences) {
             Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences);
             ByteString modValue = userOptions.get(OPT_MOD);
             if (null == modValue) {
@@ -96,10 +101,10 @@ public class TestMessageFilter extends H
         }
     }
 
-    static class HeaderMessageFilter implements MessageFilter {
+    static class HeaderMessageFilter implements ServerMessageFilter, ClientMessageFilter {
         int mod;
         @Override
-        public MessageFilter initialize(Configuration conf) {
+        public ServerMessageFilter initialize(Configuration conf) {
             // do nothing
             return this;
         }
@@ -110,8 +115,9 @@ public class TestMessageFilter extends H
         }
 
         @Override
-        public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
-                                                        SubscriptionPreferences preferences) {
+        public MessageFilterBase setSubscriptionPreferences(ByteString topic,
+                                                            ByteString subscriberId,
+                                                            SubscriptionPreferences preferences) {
             // do nothing now
             return this;
         }
@@ -146,12 +152,13 @@ public class TestMessageFilter extends H
         numServers = 1;
         super.setUp();
 
-        client = new HedwigClient(new ClientConfiguration() {
+        conf = new ClientConfiguration() {
             @Override
             public boolean isAutoSendConsumeMessageEnabled() {
                 return false;
             }
-        });
+        };
+        client = new HedwigClient(conf);
         publisher = client.getPublisher();
         subscriber = client.getSubscriber();
     }
@@ -177,26 +184,29 @@ public class TestMessageFilter extends H
     }
 
     private void receiveNumModM(final ByteString topic, final ByteString subid,
-                                final String filterClassName,
+                                final String filterClassName, final ClientMessageFilter filter,
                                 final int start, final int num, final int M,
                                 final boolean consume)
     throws Exception {
         PubSubProtocol.Map userOptions = PubSubProtocol.Map.newBuilder()
             .addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
                         .setValue(ByteString.copyFromUtf8(String.valueOf(M)))).build();
-        SubscriptionOptions options = SubscriptionOptions.newBuilder()
+        SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
             .setCreateOrAttach(CreateOrAttach.ATTACH)
-            .setMessageFilter(filterClassName)
-            .setOptions(userOptions).build();
-        subscriber.subscribe(topic, subid, options);
+            .setOptions(userOptions);
+        if (null != filterClassName) {
+            optionsBuilder.setMessageFilter(filterClassName);
+        }
+        subscriber.subscribe(topic, subid, optionsBuilder.build());
 
         final int base = start + M - start % M;
 
         final AtomicInteger expected = new AtomicInteger(base);
         final CountDownLatch latch = new CountDownLatch(1);
-        subscriber.startDelivery(topic, subid, new MessageHandler() {
+        MessageHandler msgHandler = new MessageHandler() {
             synchronized public void deliver(ByteString topic, ByteString subscriberId,
-                                             Message msg, Callback<Void> callback, Object context) {
+                                             Message msg, Callback<Void> callback,
+                                             Object context) {
                 try {
                     int value = Integer.valueOf(msg.getBody().toStringUtf8());
                     // duplicated messages received, ignore them
@@ -222,7 +232,12 @@ public class TestMessageFilter extends H
                     latch.countDown();
                 }
             }
-        });
+        };
+        if (null != filter) {
+            subscriber.startDeliveryWithFilter(topic, subid, msgHandler, filter);
+        } else {
+            subscriber.startDelivery(topic, subid, msgHandler);
+        }
         assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(),
                    latch.await(10, TimeUnit.SECONDS));
         assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get());
@@ -231,18 +246,18 @@ public class TestMessageFilter extends H
     }
 
     @Test
-    public void testMessageFilter() throws Exception {
+    public void testServerSideMessageFilter() throws Exception {
         ByteString topic = ByteString.copyFromUtf8("TestMessageFilter");
         ByteString subid = ByteString.copyFromUtf8("mysub");
 
         subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
         subscriber.closeSubscription(topic, subid);
         publishNums(topic, 0, 100, 2);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 50, 2, true);
+        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, true);
     }
 
     @Test
-    public void testInvalidMessageFilter() throws Exception {
+    public void testInvalidServerSideMessageFilter() throws Exception {
         ByteString topic = ByteString.copyFromUtf8("TestInvalidMessageFilter");
         ByteString subid = ByteString.copyFromUtf8("mysub");
 
@@ -268,21 +283,21 @@ public class TestMessageFilter extends H
         subscriber.closeSubscription(topic, subid);
 
         publishNums(topic, 0, 100, 2);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 50, 2, false);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 25, 4, false);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 33, 3, true);
+        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false);
+        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false);
+        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 33, 3, true);
 
         // change mod to receive numbers mod 5
         publishNums(topic, 100, 100, 5);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 100, 20, 5, true);
+        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 100, 20, 5, true);
 
         // change mod to receive numbers mod 7
         publishNums(topic, 200, 100, 7);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 200, 14, 7, true);
+        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 200, 14, 7, true);
     }
 
     @Test
-    public void testChangeMessageFilter() throws Exception {
+    public void testChangeServerSideMessageFilter() throws Exception {
         ByteString topic = ByteString.copyFromUtf8("TestChangeMessageFilter");
         ByteString subid = ByteString.copyFromUtf8("mysub");
 
@@ -290,16 +305,16 @@ public class TestMessageFilter extends H
         subscriber.closeSubscription(topic, subid);
 
         publishNums(topic, 0, 100, 3);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 50, 2, false);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 25, 4, false);
-        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), 0, 33, 3, true);
+        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false);
+        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false);
+        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true);
 
         publishNums(topic, 200, 100, 7);
-        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), 200, 14, 7, true);
+        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 200, 14, 7, true);
     }
 
     @Test
-    public void testFixMessageFilter() throws Exception {
+    public void testFixInvalidServerSideMessageFilter() throws Exception {
         ByteString topic = ByteString.copyFromUtf8("TestFixMessageFilter");
         ByteString subid = ByteString.copyFromUtf8("mysub");
 
@@ -308,13 +323,74 @@ public class TestMessageFilter extends H
 
         publishNums(topic, 0, 100, 3);
         try {
-            receiveNumModM(topic, subid, "Invalid_Message_Filter", 0, 33, 3, true);
+            receiveNumModM(topic, subid, "Invalid_Message_Filter", null, 0, 33, 3, true);
             // coun't reach here
             fail("Should fail subscribe with invalid message filter");
         } catch (Exception pse) {
             assertTrue("Should respond with INVALID_MESSAGE_FILTER",
                        pse.getMessage().contains("INVALID_MESSAGE_FILTER"));
         }
-        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), 0, 33, 3, true);
+        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true);
+    }
+
+    @Test
+    public void testNullClientMessageFilter() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("TestNullClientMessageFilter");
+        ByteString subid = ByteString.copyFromUtf8("mysub");
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        try {
+            subscriber.startDeliveryWithFilter(topic, subid, null, new ModMessageFilter());
+            fail("Should fail start delivery with filter using null message handler.");
+        } catch (NullPointerException npe) {
+        }
+
+        try {
+            subscriber.startDeliveryWithFilter(topic, subid, new MessageHandler() {
+                public void deliver(ByteString topic, ByteString subscriberId,
+                                    Message msg, Callback<Void> callback, Object context) {
+                    // do nothing
+                }
+            }, null);
+            fail("Should fail start delivery with filter using null message filter.");
+        } catch (NullPointerException npe) {
+        }
+    }
+
+    @Test
+    public void testClientSideMessageFilter() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("TestClientMessageFilter");
+        ByteString subid = ByteString.copyFromUtf8("mysub");
+
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.closeSubscription(topic, subid);
+        publishNums(topic, 0, 100, 2);
+        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, true);
+    }
+
+    @Test
+    public void testChangeSubscriptionPreferencesForClientFilter() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferencesForClientFilter");
+        ByteString subid = ByteString.copyFromUtf8("mysub");
+
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.closeSubscription(topic, subid);
+
+        publishNums(topic, 0, 100, 2);
+        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false);
+        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 25, 4, false);
+        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 33, 3, true);
+    }
+
+    @Test
+    public void testChangeClientSideMessageFilter() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("TestChangeClientSideMessageFilter");
+        ByteString subid = ByteString.copyFromUtf8("mysub");
+
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.closeSubscription(topic, subid);
+
+        publishNums(topic, 0, 100, 3);
+        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false);
+        receiveNumModM(topic, subid, null, new HeaderMessageFilter(), 0, 33, 3, true);
     }
 }