You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2013/07/15 00:55:36 UTC

svn commit: r1503075 - in /zookeeper/bookkeeper/trunk: CHANGES.txt hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java

Author: fpj
Date: Sun Jul 14 22:55:35 2013
New Revision: 1503075

URL: http://svn.apache.org/r1503075
Log:
BOOKKEEPER-600: shouldClaim flag isn't cleared for hedwig multiplex java client (sijie via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1503075&r1=1503074&r2=1503075&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sun Jul 14 22:55:35 2013
@@ -78,6 +78,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via sijie)
 
+	BOOKKEEPER-600: shouldClaim flag isn't cleared for hedwig multiplex java client (sijie via fpj)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-608: Make SyncThread a reusable component (ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java?rev=1503075&r1=1503074&r2=1503075&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java Sun Jul 14 22:55:35 2013
@@ -17,26 +17,20 @@
  */
 package org.apache.hedwig.client.netty.impl;
 
+import static org.apache.hedwig.util.VarArgs.va;
+
 import java.util.LinkedList;
 import java.util.Queue;
 
-import com.google.protobuf.ByteString;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.MessageConsumeData;
 import org.apache.hedwig.client.data.PubSubData;
 import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.netty.FilterableMessageHandler;
 import org.apache.hedwig.client.netty.HChannel;
 import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.client.netty.FilterableMessageHandler;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.filter.ClientMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -46,7 +40,11 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.protoextensions.MessageIdUtils;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import static org.apache.hedwig.util.VarArgs.va;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * an active subscriber handles subscription actions in a channel.
@@ -371,6 +369,7 @@ public class ActiveSubscriber {
             new ResubscribeCallback(topicSubscriber, op,
                                     channelManager, retryWaitTime);
         op.setCallback(resubscribeCb);
+        op.shouldClaim = false;
         op.context = null;
         op.setOriginalChannelForResubscribe(hChannel);
         if (logger.isDebugEnabled()) {