You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/07/30 17:20:56 UTC
svn commit: r1367117 [3/3] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/api/
hedwig-client/src/main/java/org/apache/hedwig/client/data/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-c...
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Mon Jul 30 15:20:55 2012
@@ -17,7 +17,12 @@
*/
package org.apache.hedwig.client;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
@@ -32,6 +37,8 @@ import org.apache.hedwig.client.api.Subs
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
import org.apache.hedwig.util.Callback;
@@ -120,6 +127,61 @@ public class TestPubSubClient extends Pu
}
@Test
+ public void testSyncPublishWithResponse() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("testSyncPublishWithResponse");
+ ByteString subid = ByteString.copyFromUtf8("mysubid");
+
+ final String prefix = "SyncMessage-";
+ final int numMessages = 30;
+
+ final Map<String, MessageSeqId> publishedMsgs =
+ new HashMap<String, MessageSeqId>();
+
+ final AtomicInteger numReceived = new AtomicInteger(0);
+ final CountDownLatch receiveLatch = new CountDownLatch(1);
+ final Map<String, MessageSeqId> receivedMsgs =
+ new HashMap<String, MessageSeqId>();
+
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.startDelivery(topic, subid, new MessageHandler() {
+ synchronized public void deliver(ByteString topic, ByteString subscriberId,
+ Message msg, Callback<Void> callback,
+ Object context) {
+ String str = msg.getBody().toStringUtf8();
+ receivedMsgs.put(str, msg.getMsgId());
+ if (numMessages == numReceived.incrementAndGet()) {
+ receiveLatch.countDown();
+ }
+ callback.operationFinished(context, null);
+ }
+ });
+
+ for (int i=0; i<numMessages; i++) {
+ String str = prefix + i;
+ ByteString data = ByteString.copyFromUtf8(str);
+ Message msg = Message.newBuilder().setBody(data).build();
+ PublishResponse response = publisher.publish(topic, msg);
+ assertNotNull(response);
+ publishedMsgs.put(str, response.getPublishedMsgId());
+ }
+
+ assertTrue("Timed out waiting on callback for messages.",
+ receiveLatch.await(30, TimeUnit.SECONDS));
+ assertEquals("Should be expected " + numMessages + " messages.",
+ numMessages, numReceived.get());
+ assertEquals("Should be expected " + numMessages + " messages in map.",
+ numMessages, receivedMsgs.size());
+
+ for (int i=0; i<numMessages; i++) {
+ final String str = prefix + i;
+ MessageSeqId pubId = publishedMsgs.get(str);
+ MessageSeqId revId = receivedMsgs.get(str);
+ assertTrue("Doesn't receive same message seq id for " + str,
+ pubId.equals(revId));
+ }
+ }
+
+ @Test
public void testAsyncPublish() throws Exception {
publisher.asyncPublish(ByteString.copyFromUtf8("myAsyncTopic"), Message.newBuilder().setBody(
ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null);
@@ -127,6 +189,79 @@ public class TestPubSubClient extends Pu
}
@Test
+ public void testAsyncPublishWithResponse() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("testAsyncPublishWithResponse");
+ ByteString subid = ByteString.copyFromUtf8("mysubid");
+
+ final String prefix = "AsyncMessage-";
+ final int numMessages = 30;
+
+ final AtomicInteger numPublished = new AtomicInteger(0);
+ final CountDownLatch publishLatch = new CountDownLatch(1);
+ final Map<String, MessageSeqId> publishedMsgs =
+ new HashMap<String, MessageSeqId>();
+
+ final AtomicInteger numReceived = new AtomicInteger(0);
+ final CountDownLatch receiveLatch = new CountDownLatch(1);
+ final Map<String, MessageSeqId> receivedMsgs =
+ new HashMap<String, MessageSeqId>();
+
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.startDelivery(topic, subid, new MessageHandler() {
+ synchronized public void deliver(ByteString topic, ByteString subscriberId,
+ Message msg, Callback<Void> callback,
+ Object context) {
+ String str = msg.getBody().toStringUtf8();
+ receivedMsgs.put(str, msg.getMsgId());
+ if (numMessages == numReceived.incrementAndGet()) {
+ receiveLatch.countDown();
+ }
+ callback.operationFinished(context, null);
+ }
+ });
+
+ for (int i=0; i<numMessages; i++) {
+ final String str = prefix + i;
+ ByteString data = ByteString.copyFromUtf8(str);
+ Message msg = Message.newBuilder().setBody(data).build();
+ publisher.asyncPublishWithResponse(topic, msg, new Callback<PublishResponse>() {
+ @Override
+ public void operationFinished(Object ctx, PublishResponse response) {
+ publishedMsgs.put(str, response.getPublishedMsgId());
+ if (numMessages == numPublished.incrementAndGet()) {
+ publishLatch.countDown();
+ }
+ }
+ @Override
+ public void operationFailed(Object ctx, final PubSubException exception) {
+ publishLatch.countDown();
+ }
+ }, null);
+ }
+ assertTrue("Timed out waiting on callback for publish requests.",
+ publishLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Should be expected " + numMessages + " publishes.",
+ numMessages, numPublished.get());
+ assertEquals("Should be expected " + numMessages + " publishe responses.",
+ numMessages, publishedMsgs.size());
+
+ assertTrue("Timed out waiting on callback for messages.",
+ receiveLatch.await(30, TimeUnit.SECONDS));
+ assertEquals("Should be expected " + numMessages + " messages.",
+ numMessages, numReceived.get());
+ assertEquals("Should be expected " + numMessages + " messages in map.",
+ numMessages, receivedMsgs.size());
+
+ for (int i=0; i<numMessages; i++) {
+ final String str = prefix + i;
+ MessageSeqId pubId = publishedMsgs.get(str);
+ MessageSeqId revId = receivedMsgs.get(str);
+ assertTrue("Doesn't receive same message seq id for " + str,
+ pubId.equals(revId));
+ }
+ }
+
+ @Test
public void testMultipleAsyncPublish() throws Exception {
ByteString topic1 = ByteString.copyFromUtf8("myFirstTopic");
ByteString topic2 = ByteString.copyFromUtf8("myNewTopic");
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java Mon Jul 30 15:20:55 2012
@@ -73,12 +73,13 @@ public class StubPersistenceManager impl
public void persistMessage(PersistRequest request) {
if (failure) {
- request.callback.operationFailed(request.getCtx(), exception);
+ request.getCallback().operationFailed(request.getCtx(), exception);
return;
}
MapMethods.addToMultiMap(messages, request.getTopic(), request.getMessage(), ArrayListMessageFactory.instance);
- request.callback.operationFinished(request.getCtx(), (long) messages.get(request.getTopic()).size());
+ request.getCallback().operationFinished(request.getCtx(), MessageIdUtils.mergeLocalSeqId(request.getMessage(),
+ (long) messages.get(request.getTopic()).size()).getMsgId());
}
public void scanSingleMessage(ScanRequest request) {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java Mon Jul 30 15:20:55 2012
@@ -30,6 +30,7 @@ import junit.framework.TestCase;
import org.apache.hedwig.HelperMethods;
import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
@@ -178,7 +179,7 @@ public class TestBookKeeperPersistenceMa
}
}
- class TestCallback implements Callback<Long> {
+ class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
@Override
@SuppressWarnings("unchecked")
@@ -193,7 +194,7 @@ public class TestBookKeeperPersistenceMa
@Override
@SuppressWarnings("unchecked")
- public void operationFinished(Object ctx, Long resultOfOperation) {
+ public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
try {
statusQueue.put(true);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java Mon Jul 30 15:20:55 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExe
import junit.framework.TestCase;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.hedwig.protocol.PubSubProtocol;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -107,9 +108,9 @@ public class TestBookkeeperPersistenceMa
assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
assertEquals(numPrevLedgers, bkpm.topicInfos.get(topic).ledgerRanges.size());
- StubCallback<Long> persistCallback = new StubCallback<Long>();
+ StubCallback<PubSubProtocol.MessageSeqId> persistCallback = new StubCallback<PubSubProtocol.MessageSeqId>();
bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null));
- assertEquals(new Long(index + 1), ConcurrencyUtils.take(persistCallback.queue).left());
+ assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
// once in every 10 times, give up ledger
if (r.nextInt(10) == 9) {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java Mon Jul 30 15:20:55 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
+import org.apache.hedwig.protocol.PubSubProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
@@ -45,14 +46,14 @@ public abstract class TestPersistenceMan
RuntimeException failureException;
- class TestCallback implements Callback<Long> {
+ class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
public void operationFailed(Object ctx, PubSubException exception) {
throw (failureException = new RuntimeException(exception));
}
@SuppressWarnings("unchecked")
- public void operationFinished(Object ctx, Long resultOfOperation) {
+ public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
try {
statusQueue.put(true);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java Mon Jul 30 15:20:55 2012
@@ -21,6 +21,7 @@ import static org.junit.Assert.*;
import java.util.List;
+import org.apache.hedwig.protocol.PubSubProtocol;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -91,7 +92,7 @@ public class TestReadAheadCacheWhiteBox
@Test
public void testPersistMessage() throws Exception {
- StubCallback<Long> callback = new StubCallback<Long>();
+ StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>();
PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null);
stubPersistenceManager.failure = true;
@@ -107,7 +108,7 @@ public class TestReadAheadCacheWhiteBox
}
private void persistMessage(Message msg) throws Exception {
- StubCallback<Long> callback = new StubCallback<Long>();
+ StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>();
PersistRequest request = new PersistRequest(topic, msg, callback, null);
cacheBasedPersistenceManager.persistMessage(request);
assertNotNull(ConcurrencyUtils.take(callback.queue).left());