You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/07/22 19:18:01 UTC

svn commit: r1505735 - in /zookeeper/bookkeeper/branches/branch-4.2: ./ hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ hedwig-server/src/test/java/org/apache/hedwig/server/delivery/

Author: ivank
Date: Mon Jul 22 17:18:01 2013
New Revision: 1505735

URL: http://svn.apache.org/r1505735
Log:
BOOKKEEPER-607: Filtered Messages Require ACK from Client Causes User Being Throttled Incorrectly Forever (sijie via ivank)

Modified:
    zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
    zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java

Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1505735&r1=1505734&r2=1505735&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Mon Jul 22 17:18:01 2013
@@ -54,6 +54,8 @@ Release 4.2.2 - Unreleased
 
 	BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
 
+        BOOKKEEPER-607: Filtered Messages Require ACK from Client Causes User Being Throttled Incorrectly Forever (sijie via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via ivank)

Modified: zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1505735&r1=1505734&r2=1505735&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Mon Jul 22 17:18:01 2013
@@ -139,6 +139,7 @@ public class FIFODeliveryManager impleme
         this.cfg = cfg;
     }
 
+    @Override
     public void start() {
         workerThread.start();
     }
@@ -205,7 +206,7 @@ public class FIFODeliveryManager impleme
                                          MessageSeqId seqIdToStartFrom,
                                          DeliveryEndPoint endPoint, ServerMessageFilter filter,
                                          Callback<Void> callback, Object ctx) {
-        ActiveSubscriberState subscriber = 
+        ActiveSubscriberState subscriber =
             new ActiveSubscriberState(topic, subscriberId,
                                       preferences,
                                       seqIdToStartFrom.getLocalComponent() - 1,
@@ -262,7 +263,7 @@ public class FIFODeliveryManager impleme
         if (null == subState) {
             return;
         }
-        subState.messageConsumed(consumedSeqId.getLocalComponent()); 
+        subState.messageConsumed(consumedSeqId.getLocalComponent());
     }
 
     /**
@@ -313,6 +314,7 @@ public class FIFODeliveryManager impleme
     /**
      * Stop method which will enqueue a ShutdownDeliveryManagerRequest.
      */
+    @Override
     public void stop() {
         enqueueWithoutFailure(new ShutdownDeliveryManagerRequest());
     }
@@ -528,8 +530,8 @@ public class FIFODeliveryManager impleme
                 enqueueWithoutFailure(new DeliveryManagerRequest() {
                     @Override
                     public void performRequest() {
-                        // enqueue 
-                        clearRetryDelayForSubscriber(ActiveSubscriberState.this);            
+                        // enqueue
+                        clearRetryDelayForSubscriber(ActiveSubscriberState.this);
                     }
                 });
             }
@@ -621,6 +623,19 @@ public class FIFODeliveryManager impleme
             }
 
             if (!filter.testMessage(message)) {
+                // for filtered out messages, we don't deliver the message to client, so we would not
+                // receive its consume request which moves the <i>lastSeqIdConsumedUtil</i> pointer.
+                // we move the <i>lastSeqIdConsumedUtil</i> here for filtered out messages, which would
+                // avoid a subscriber being throttled due to the message gap introduced by filtering.
+                //
+                // it is OK to move <i>lastSeqIdConsumedUtil</i> here, since this pointer is subscriber's
+                // delivery state which to trottling deliver. changing <i>lastSeqIdConsumedUtil</i> would
+                // not affect the subscriber's consume pointer in zookeeper which is managed in subscription
+                // manager.
+                //
+                // And marking message consumed before calling sending finished, would avoid the subscriber
+                // being throttled first and released from throttled state laster.
+                messageConsumed(message.getMsgId().getLocalComponent());
                 sendingFinished();
                 return;
             }
@@ -641,6 +656,7 @@ public class FIFODeliveryManager impleme
 
         }
 
+        @Override
         public void scanFailed(Object ctx, Exception exception) {
             if (!checkConnected()) {
                 return;
@@ -650,6 +666,7 @@ public class FIFODeliveryManager impleme
             retryErroredSubscriberAfterDelay(this);
         }
 
+        @Override
         public void scanFinished(Object ctx, ReasonForFinish reason) {
             checkConnected();
         }
@@ -658,6 +675,7 @@ public class FIFODeliveryManager impleme
          * ===============================================================
          * {@link DeliveryCallback} methods
          */
+        @Override
         public void sendingFinished() {
             if (!isConnected()) {
                 return;
@@ -687,6 +705,7 @@ public class FIFODeliveryManager impleme
         }
 
 
+        @Override
         public void permanentErrorOnSend() {
             // the underlying channel is broken, the channel will
             // be closed in UmbrellaHandler when exception happened.
@@ -695,6 +714,7 @@ public class FIFODeliveryManager impleme
                                   NOP_CALLBACK, null);
         }
 
+        @Override
         public void transientErrorOnSend() {
             retryErroredSubscriberAfterDelay(this);
         }
@@ -703,6 +723,7 @@ public class FIFODeliveryManager impleme
          * ===============================================================
          * {@link DeliveryManagerRequest} methods
          */
+        @Override
         public void performRequest() {
             // Put this subscriber in the channel to subscriber mapping
             ActiveSubscriberState prevSubscriber =
@@ -847,6 +868,7 @@ public class FIFODeliveryManager impleme
         // This is a simple type of Request we will enqueue when the
         // PubSubServer is shut down and we want to stop the DeliveryManager
         // thread.
+        @Override
         public void performRequest() {
             keepRunning = false;
         }

Modified: zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java?rev=1505735&r1=1505734&r2=1505735&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java Mon Jul 22 17:18:01 2013
@@ -17,14 +17,18 @@
  */
 package org.apache.hedwig.server.delivery;
 
+import java.io.IOException;
+
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+ 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,11 +44,19 @@ import org.apache.hedwig.client.HedwigCl
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.api.Publisher;
 import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.filter.MessageFilterBase;
+import org.apache.hedwig.filter.ServerMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol;
+
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+
 import org.apache.hedwig.server.HedwigHubTestBase;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.util.Callback;
@@ -53,6 +65,43 @@ import org.apache.hedwig.util.Callback;
 public class TestThrottlingDelivery extends HedwigHubTestBase {
 
     private static final int DEFAULT_MESSAGE_WINDOW_SIZE = 10;
+    private static final String OPT_MOD = "MOD";
+
+    static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter {
+
+        int mod;
+
+        @Override
+        public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+                SubscriptionPreferences preferences) {
+            Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences);
+            ByteString modValue = userOptions.get(OPT_MOD);
+            if (null == modValue) {
+                mod = 0;
+            } else {
+                mod = Integer.valueOf(modValue.toStringUtf8());
+            }
+            return this;
+        }
+
+        @Override
+        public boolean testMessage(Message message) {
+            int value = Integer.valueOf(message.getBody().toStringUtf8());
+            return 0 == value % mod;
+        }
+
+        @Override
+        public ServerMessageFilter initialize(Configuration conf) throws ConfigurationException, IOException {
+            // do nothing
+            return this;
+        }
+
+        @Override
+        public void uninitialize() {
+            // do nothing
+        }
+
+    }
 
     protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration {
 
@@ -62,7 +111,7 @@ public class TestThrottlingDelivery exte
 
         @Override
         public int getDefaultMessageWindowSize() {
-            return TestThrottlingDelivery.this.DEFAULT_MESSAGE_WINDOW_SIZE;
+            return TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE;
         }
     }
 
@@ -71,7 +120,7 @@ public class TestThrottlingDelivery exte
         int messageWindowSize;
 
         ThrottleDeliveryClientConfiguration() {
-            this(TestThrottlingDelivery.this.DEFAULT_MESSAGE_WINDOW_SIZE);
+            this(TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE);
         }
 
         ThrottleDeliveryClientConfiguration(int messageWindowSize) {
@@ -98,6 +147,73 @@ public class TestThrottlingDelivery exte
         }
     }
 
+    private void publishNums(Publisher pub, ByteString topic, int start, int num, int M) throws Exception {
+        for (int i = 1; i <= num; i++) {
+            PubSubProtocol.Map.Builder propsBuilder = PubSubProtocol.Map.newBuilder().addEntries(
+                    PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
+                            .setValue(ByteString.copyFromUtf8(String.valueOf((start + i) % M))));
+            MessageHeader.Builder headerBuilder = MessageHeader.newBuilder().setProperties(propsBuilder);
+            Message msg = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(start + i)))
+                    .setHeader(headerBuilder).build();
+            pub.publish(topic, msg);
+        }
+    }
+
+    private void throttleWithFilter(Publisher pub, final Subscriber sub,
+                           ByteString topic, ByteString subid,
+                           final int X) throws Exception {
+        // publish numbers with header (so only 3 messages would be delivered)
+        publishNums(pub, topic, 0, 3 * X, X);
+
+        // subscribe the topic with filter
+        PubSubProtocol.Map userOptions = PubSubProtocol.Map
+                .newBuilder()
+                .addEntries(
+                        PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
+                                .setValue(ByteString.copyFromUtf8(String.valueOf(X)))).build();
+        SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH)
+                .setOptions(userOptions).setMessageFilter(ModMessageFilter.class.getName()).build();
+        sub.subscribe(topic, subid, opts);
+
+        final AtomicInteger expected = new AtomicInteger(X);
+        final CountDownLatch latch = new CountDownLatch(1);
+        sub.startDelivery(topic, subid, new MessageHandler() {
+            @Override
+            public synchronized void deliver(ByteString topic, ByteString subscriberId,
+                                             Message msg,
+                                             Callback<Void> callback, Object context) {
+                try {
+                    int value = Integer.valueOf(msg.getBody().toStringUtf8());
+                    logger.debug("Received message {},", value);
+
+                    if (value == expected.get()) {
+                        expected.addAndGet(X);
+                    } else {
+                        // error condition
+                        logger.error("Did not receive expected value, expected {}, got {}",
+                                     expected.get(), value);
+                        expected.set(0);
+                        latch.countDown();
+                    }
+                    if (value == 3 * X) {
+                        latch.countDown();
+                    }
+                    callback.operationFinished(context, null);
+                    sub.consume(topic, subscriberId, msg.getMsgId());
+                } catch (Exception e) {
+                    logger.error("Received bad message", e);
+                    latch.countDown();
+                }
+            }
+        });
+
+        assertTrue("Timed out waiting for messages " + 3 * X, latch.await(10, TimeUnit.SECONDS));
+        assertEquals("Should be expected message with " + 4 * X, 4 * X, expected.get());
+
+        sub.stopDelivery(topic, subid);
+        sub.closeSubscription(topic, subid);
+    }
+
     private void throttleX(Publisher pub, final Subscriber sub,
                            ByteString topic, ByteString subid,
                            final int X) throws Exception {
@@ -138,7 +254,7 @@ public class TestThrottlingDelivery exte
                     callback.operationFinished(context, null);
                     if (expected.get() > X + 1) {
                         sub.consume(topic, subscriberId, msg.getMsgId());
-                    }      
+                    }
                 } catch (Exception e) {
                     logger.error("Received bad message", e);
                     throttleLatch.countDown();
@@ -197,7 +313,7 @@ public class TestThrottlingDelivery exte
         Publisher pub = client.getPublisher();
         Subscriber sub = client.getSubscriber();
 
-        ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle"); 
+        ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle");
         ByteString subid = ByteString.copyFromUtf8("serverThrottleSub");
         sub.subscribe(topic, subid, CreateOrAttach.CREATE);
         sub.closeSubscription(topic, subid);
@@ -228,4 +344,26 @@ public class TestThrottlingDelivery exte
         client.close();
     }
 
+    @Test(timeout = 60000)
+    public void testThrottleWithServerSideFilter() throws Exception {
+        int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE;
+        ThrottleDeliveryClientConfiguration conf = new ThrottleDeliveryClientConfiguration();
+        HedwigClient client = new HedwigClient(conf);
+        Publisher pub = client.getPublisher();
+        Subscriber sub = client.getSubscriber();
+
+        ByteString topic = ByteString.copyFromUtf8("testThrottleWithServerSideFilter");
+        ByteString subid = ByteString.copyFromUtf8("mysub");
+        SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).build();
+        sub.subscribe(topic, subid, opts);
+        sub.closeSubscription(topic, subid);
+
+        // message gap: half of the throttle threshold
+        throttleWithFilter(pub, sub, topic, subid, messageWindowSize / 2);
+        // message gap: equals to the throttle threshold
+        throttleWithFilter(pub, sub, topic, subid, messageWindowSize);
+        // message gap: larger than the throttle threshold
+        throttleWithFilter(pub, sub, topic, subid, messageWindowSize + messageWindowSize / 2);
+    }
+
 }