You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/03 17:14:14 UTC
svn commit: r1380268 [2/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/api/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-...
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Mon Sep 3 15:14:13 2012
@@ -22,7 +22,7 @@ import java.util.Queue;
import com.google.protobuf.ByteString;
import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
public class StubDeliveryManager implements DeliveryManager {
@@ -32,10 +32,10 @@ public class StubDeliveryManager impleme
public ByteString subscriberId;
public MessageSeqId seqIdToStartFrom;
public DeliveryEndPoint endPoint;
- public MessageFilter filter;
+ public ServerMessageFilter filter;
public StartServingRequest(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, MessageFilter filter) {
+ DeliveryEndPoint endPoint, ServerMessageFilter filter) {
this.topic = topic;
this.subscriberId = subscriberId;
this.seqIdToStartFrom = seqIdToStartFrom;
@@ -49,7 +49,7 @@ public class StubDeliveryManager impleme
@Override
public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, MessageFilter filter) {
+ DeliveryEndPoint endPoint, ServerMessageFilter filter) {
lastRequest.add(new StartServingRequest(topic, subscriberId, seqIdToStartFrom, endPoint, filter));
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java?rev=1380268&r1=1380267&r2=1380268&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java Mon Sep 3 15:14:13 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.hedwig.client.HedwigClient;
@@ -47,7 +48,9 @@ import org.apache.hedwig.client.api.Clie
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.filter.MessageFilter;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.filter.MessageFilterBase;
+import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.server.HedwigHubTestBase;
@@ -55,18 +58,19 @@ import org.apache.hedwig.server.HedwigHu
public class TestMessageFilter extends HedwigHubTestBase {
// Client side variables
+ protected ClientConfiguration conf;
protected HedwigClient client;
protected Publisher publisher;
protected Subscriber subscriber;
static final String OPT_MOD = "MOD";
- static class ModMessageFilter implements MessageFilter {
+ static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter {
int mod;
@Override
- public MessageFilter initialize(Configuration conf) {
+ public ServerMessageFilter initialize(Configuration conf) {
// do nothing
return this;
}
@@ -77,8 +81,9 @@ public class TestMessageFilter extends H
}
@Override
- public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences) {
+ public MessageFilterBase setSubscriptionPreferences(ByteString topic,
+ ByteString subscriberId,
+ SubscriptionPreferences preferences) {
Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences);
ByteString modValue = userOptions.get(OPT_MOD);
if (null == modValue) {
@@ -96,10 +101,10 @@ public class TestMessageFilter extends H
}
}
- static class HeaderMessageFilter implements MessageFilter {
+ static class HeaderMessageFilter implements ServerMessageFilter, ClientMessageFilter {
int mod;
@Override
- public MessageFilter initialize(Configuration conf) {
+ public ServerMessageFilter initialize(Configuration conf) {
// do nothing
return this;
}
@@ -110,8 +115,9 @@ public class TestMessageFilter extends H
}
@Override
- public MessageFilter setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences) {
+ public MessageFilterBase setSubscriptionPreferences(ByteString topic,
+ ByteString subscriberId,
+ SubscriptionPreferences preferences) {
// do nothing now
return this;
}
@@ -146,12 +152,13 @@ public class TestMessageFilter extends H
numServers = 1;
super.setUp();
- client = new HedwigClient(new ClientConfiguration() {
+ conf = new ClientConfiguration() {
@Override
public boolean isAutoSendConsumeMessageEnabled() {
return false;
}
- });
+ };
+ client = new HedwigClient(conf);
publisher = client.getPublisher();
subscriber = client.getSubscriber();
}
@@ -177,26 +184,29 @@ public class TestMessageFilter extends H
}
private void receiveNumModM(final ByteString topic, final ByteString subid,
- final String filterClassName,
+ final String filterClassName, final ClientMessageFilter filter,
final int start, final int num, final int M,
final boolean consume)
throws Exception {
PubSubProtocol.Map userOptions = PubSubProtocol.Map.newBuilder()
.addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
.setValue(ByteString.copyFromUtf8(String.valueOf(M)))).build();
- SubscriptionOptions options = SubscriptionOptions.newBuilder()
+ SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.ATTACH)
- .setMessageFilter(filterClassName)
- .setOptions(userOptions).build();
- subscriber.subscribe(topic, subid, options);
+ .setOptions(userOptions);
+ if (null != filterClassName) {
+ optionsBuilder.setMessageFilter(filterClassName);
+ }
+ subscriber.subscribe(topic, subid, optionsBuilder.build());
final int base = start + M - start % M;
final AtomicInteger expected = new AtomicInteger(base);
final CountDownLatch latch = new CountDownLatch(1);
- subscriber.startDelivery(topic, subid, new MessageHandler() {
+ MessageHandler msgHandler = new MessageHandler() {
synchronized public void deliver(ByteString topic, ByteString subscriberId,
- Message msg, Callback<Void> callback, Object context) {
+ Message msg, Callback<Void> callback,
+ Object context) {
try {
int value = Integer.valueOf(msg.getBody().toStringUtf8());
// duplicated messages received, ignore them
@@ -222,7 +232,12 @@ public class TestMessageFilter extends H
latch.countDown();
}
}
- });
+ };
+ if (null != filter) {
+ subscriber.startDeliveryWithFilter(topic, subid, msgHandler, filter);
+ } else {
+ subscriber.startDelivery(topic, subid, msgHandler);
+ }
assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(),
latch.await(10, TimeUnit.SECONDS));
assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get());
@@ -231,18 +246,18 @@ public class TestMessageFilter extends H
}
@Test
- public void testMessageFilter() throws Exception {
+ public void testServerSideMessageFilter() throws Exception {
ByteString topic = ByteString.copyFromUtf8("TestMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 2);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 50, 2, true);
+ receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, true);
}
@Test
- public void testInvalidMessageFilter() throws Exception {
+ public void testInvalidServerSideMessageFilter() throws Exception {
ByteString topic = ByteString.copyFromUtf8("TestInvalidMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
@@ -268,21 +283,21 @@ public class TestMessageFilter extends H
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 2);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 50, 2, false);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 25, 4, false);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 33, 3, true);
+ receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false);
+ receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false);
+ receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 33, 3, true);
// change mod to receive numbers mod 5
publishNums(topic, 100, 100, 5);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 100, 20, 5, true);
+ receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 100, 20, 5, true);
// change mod to receive numbers mod 7
publishNums(topic, 200, 100, 7);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 200, 14, 7, true);
+ receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 200, 14, 7, true);
}
@Test
- public void testChangeMessageFilter() throws Exception {
+ public void testChangeServerSideMessageFilter() throws Exception {
ByteString topic = ByteString.copyFromUtf8("TestChangeMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
@@ -290,16 +305,16 @@ public class TestMessageFilter extends H
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 3);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 50, 2, false);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), 0, 25, 4, false);
- receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), 0, 33, 3, true);
+ receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false);
+ receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false);
+ receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true);
publishNums(topic, 200, 100, 7);
- receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), 200, 14, 7, true);
+ receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 200, 14, 7, true);
}
@Test
- public void testFixMessageFilter() throws Exception {
+ public void testFixInvalidServerSideMessageFilter() throws Exception {
ByteString topic = ByteString.copyFromUtf8("TestFixMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
@@ -308,13 +323,74 @@ public class TestMessageFilter extends H
publishNums(topic, 0, 100, 3);
try {
- receiveNumModM(topic, subid, "Invalid_Message_Filter", 0, 33, 3, true);
+ receiveNumModM(topic, subid, "Invalid_Message_Filter", null, 0, 33, 3, true);
// coun't reach here
fail("Should fail subscribe with invalid message filter");
} catch (Exception pse) {
assertTrue("Should respond with INVALID_MESSAGE_FILTER",
pse.getMessage().contains("INVALID_MESSAGE_FILTER"));
}
- receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), 0, 33, 3, true);
+ receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true);
+ }
+
+ @Test
+ public void testNullClientMessageFilter() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("TestNullClientMessageFilter");
+ ByteString subid = ByteString.copyFromUtf8("mysub");
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ try {
+ subscriber.startDeliveryWithFilter(topic, subid, null, new ModMessageFilter());
+ fail("Should fail start delivery with filter using null message handler.");
+ } catch (NullPointerException npe) {
+ }
+
+ try {
+ subscriber.startDeliveryWithFilter(topic, subid, new MessageHandler() {
+ public void deliver(ByteString topic, ByteString subscriberId,
+ Message msg, Callback<Void> callback, Object context) {
+ // do nothing
+ }
+ }, null);
+ fail("Should fail start delivery with filter using null message filter.");
+ } catch (NullPointerException npe) {
+ }
+ }
+
+ @Test
+ public void testClientSideMessageFilter() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("TestClientMessageFilter");
+ ByteString subid = ByteString.copyFromUtf8("mysub");
+
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.closeSubscription(topic, subid);
+ publishNums(topic, 0, 100, 2);
+ receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, true);
+ }
+
+ @Test
+ public void testChangeSubscriptionPreferencesForClientFilter() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferencesForClientFilter");
+ ByteString subid = ByteString.copyFromUtf8("mysub");
+
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.closeSubscription(topic, subid);
+
+ publishNums(topic, 0, 100, 2);
+ receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false);
+ receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 25, 4, false);
+ receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 33, 3, true);
+ }
+
+ @Test
+ public void testChangeClientSideMessageFilter() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("TestChangeClientSideMessageFilter");
+ ByteString subid = ByteString.copyFromUtf8("mysub");
+
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.closeSubscription(topic, subid);
+
+ publishNums(topic, 0, 100, 3);
+ receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false);
+ receiveNumModM(topic, subid, null, new HeaderMessageFilter(), 0, 33, 3, true);
}
}