You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/07/30 17:20:56 UTC

svn commit: r1367117 [3/3] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/api/ hedwig-client/src/main/java/org/apache/hedwig/client/data/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-c...

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Mon Jul 30 15:20:55 2012
@@ -17,7 +17,12 @@
  */
 package org.apache.hedwig.client;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.After;
 import org.junit.Before;
@@ -32,6 +37,8 @@ import org.apache.hedwig.client.api.Subs
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
 import org.apache.hedwig.util.Callback;
@@ -120,6 +127,61 @@ public class TestPubSubClient extends Pu
     }
 
     @Test
+    public void testSyncPublishWithResponse() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("testSyncPublishWithResponse");
+        ByteString subid = ByteString.copyFromUtf8("mysubid");
+
+        final String prefix = "SyncMessage-";
+        final int numMessages = 30;
+
+        final Map<String, MessageSeqId> publishedMsgs =
+            new HashMap<String, MessageSeqId>();
+
+        final AtomicInteger numReceived = new AtomicInteger(0);
+        final CountDownLatch receiveLatch = new CountDownLatch(1);
+        final Map<String, MessageSeqId> receivedMsgs =
+            new HashMap<String, MessageSeqId>();
+
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.startDelivery(topic, subid, new MessageHandler() {
+            synchronized public void deliver(ByteString topic, ByteString subscriberId,
+                                             Message msg, Callback<Void> callback,
+                                             Object context) {
+                String str = msg.getBody().toStringUtf8();
+                receivedMsgs.put(str, msg.getMsgId()); 
+                if (numMessages == numReceived.incrementAndGet()) {
+                    receiveLatch.countDown();
+                }
+                callback.operationFinished(context, null);
+            }
+        });
+
+        for (int i=0; i<numMessages; i++) {
+            String str = prefix + i;
+            ByteString data = ByteString.copyFromUtf8(str);
+            Message msg = Message.newBuilder().setBody(data).build();
+            PublishResponse response = publisher.publish(topic, msg);
+            assertNotNull(response);
+            publishedMsgs.put(str, response.getPublishedMsgId());
+        }
+
+        assertTrue("Timed out waiting on callback for messages.",
+                   receiveLatch.await(30, TimeUnit.SECONDS));
+        assertEquals("Should be expected " + numMessages + " messages.",
+                     numMessages, numReceived.get());
+        assertEquals("Should be expected " + numMessages + " messages in map.",
+                     numMessages, receivedMsgs.size());
+
+        for (int i=0; i<numMessages; i++) {
+            final String str = prefix + i;
+            MessageSeqId pubId = publishedMsgs.get(str);
+            MessageSeqId revId = receivedMsgs.get(str);
+            assertTrue("Doesn't receive same message seq id for " + str,
+                       pubId.equals(revId));
+        }
+    }
+
+    @Test
     public void testAsyncPublish() throws Exception {
         publisher.asyncPublish(ByteString.copyFromUtf8("myAsyncTopic"), Message.newBuilder().setBody(
                                    ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null);
@@ -127,6 +189,79 @@ public class TestPubSubClient extends Pu
     }
 
     @Test
+    public void testAsyncPublishWithResponse() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("testAsyncPublishWithResponse");
+        ByteString subid = ByteString.copyFromUtf8("mysubid");
+
+        final String prefix = "AsyncMessage-";
+        final int numMessages = 30;
+
+        final AtomicInteger numPublished = new AtomicInteger(0);
+        final CountDownLatch publishLatch = new CountDownLatch(1);
+        final Map<String, MessageSeqId> publishedMsgs =
+            new HashMap<String, MessageSeqId>();
+
+        final AtomicInteger numReceived = new AtomicInteger(0);
+        final CountDownLatch receiveLatch = new CountDownLatch(1);
+        final Map<String, MessageSeqId> receivedMsgs =
+            new HashMap<String, MessageSeqId>();
+
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.startDelivery(topic, subid, new MessageHandler() {
+            synchronized public void deliver(ByteString topic, ByteString subscriberId,
+                                             Message msg, Callback<Void> callback,
+                                             Object context) {
+                String str = msg.getBody().toStringUtf8();
+                receivedMsgs.put(str, msg.getMsgId()); 
+                if (numMessages == numReceived.incrementAndGet()) {
+                    receiveLatch.countDown();
+                }
+                callback.operationFinished(context, null);
+            }
+        });
+
+        for (int i=0; i<numMessages; i++) {
+            final String str = prefix + i;
+            ByteString data = ByteString.copyFromUtf8(str);
+            Message msg = Message.newBuilder().setBody(data).build();
+            publisher.asyncPublishWithResponse(topic, msg, new Callback<PublishResponse>() {
+                @Override
+                public void operationFinished(Object ctx, PublishResponse response) {
+                    publishedMsgs.put(str, response.getPublishedMsgId());
+                    if (numMessages == numPublished.incrementAndGet()) {
+                        publishLatch.countDown();
+                    }
+                }
+                @Override
+                public void operationFailed(Object ctx, final PubSubException exception) {
+                    publishLatch.countDown();
+                }
+            }, null);
+        }
+        assertTrue("Timed out waiting on callback for publish requests.",
+                   publishLatch.await(10, TimeUnit.SECONDS));
+        assertEquals("Should be expected " + numMessages + " publishes.",
+                     numMessages, numPublished.get());
+        assertEquals("Should be expected " + numMessages + " publishe responses.",
+                     numMessages, publishedMsgs.size());
+
+        assertTrue("Timed out waiting on callback for messages.",
+                   receiveLatch.await(30, TimeUnit.SECONDS));
+        assertEquals("Should be expected " + numMessages + " messages.",
+                     numMessages, numReceived.get());
+        assertEquals("Should be expected " + numMessages + " messages in map.",
+                     numMessages, receivedMsgs.size());
+
+        for (int i=0; i<numMessages; i++) {
+            final String str = prefix + i;
+            MessageSeqId pubId = publishedMsgs.get(str);
+            MessageSeqId revId = receivedMsgs.get(str);
+            assertTrue("Doesn't receive same message seq id for " + str,
+                       pubId.equals(revId));
+        }
+    }
+
+    @Test
     public void testMultipleAsyncPublish() throws Exception {
         ByteString topic1 = ByteString.copyFromUtf8("myFirstTopic");
         ByteString topic2 = ByteString.copyFromUtf8("myNewTopic");

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java Mon Jul 30 15:20:55 2012
@@ -73,12 +73,13 @@ public class StubPersistenceManager impl
 
     public void persistMessage(PersistRequest request) {
         if (failure) {
-            request.callback.operationFailed(request.getCtx(), exception);
+            request.getCallback().operationFailed(request.getCtx(), exception);
             return;
         }
 
         MapMethods.addToMultiMap(messages, request.getTopic(), request.getMessage(), ArrayListMessageFactory.instance);
-        request.callback.operationFinished(request.getCtx(), (long) messages.get(request.getTopic()).size());
+        request.getCallback().operationFinished(request.getCtx(), MessageIdUtils.mergeLocalSeqId(request.getMessage(),
+                (long) messages.get(request.getTopic()).size()).getMsgId());
     }
 
     public void scanSingleMessage(ScanRequest request) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java Mon Jul 30 15:20:55 2012
@@ -30,6 +30,7 @@ import junit.framework.TestCase;
 
 import org.apache.hedwig.HelperMethods;
 import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.meta.MetadataManagerFactory;
@@ -178,7 +179,7 @@ public class TestBookKeeperPersistenceMa
         }
     }
 
-    class TestCallback implements Callback<Long> {
+    class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
 
         @Override
         @SuppressWarnings("unchecked")
@@ -193,7 +194,7 @@ public class TestBookKeeperPersistenceMa
 
         @Override
         @SuppressWarnings("unchecked")
-        public void operationFinished(Object ctx, Long resultOfOperation) {
+        public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
             LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
             try {
                 statusQueue.put(true);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java Mon Jul 30 15:20:55 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExe
 import junit.framework.TestCase;
 
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.hedwig.protocol.PubSubProtocol;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -107,9 +108,9 @@ public class TestBookkeeperPersistenceMa
             assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
             assertEquals(numPrevLedgers, bkpm.topicInfos.get(topic).ledgerRanges.size());
 
-            StubCallback<Long> persistCallback = new StubCallback<Long>();
+            StubCallback<PubSubProtocol.MessageSeqId> persistCallback = new StubCallback<PubSubProtocol.MessageSeqId>();
             bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null));
-            assertEquals(new Long(index + 1), ConcurrencyUtils.take(persistCallback.queue).left());
+            assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
 
             // once in every 10 times, give up ledger
             if (r.nextInt(10) == 9) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java Mon Jul 30 15:20:55 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
+import org.apache.hedwig.protocol.PubSubProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.Test;
@@ -45,14 +46,14 @@ public abstract class TestPersistenceMan
 
     RuntimeException failureException;
 
-    class TestCallback implements Callback<Long> {
+    class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
 
         public void operationFailed(Object ctx, PubSubException exception) {
             throw (failureException = new RuntimeException(exception));
         }
 
         @SuppressWarnings("unchecked")
-        public void operationFinished(Object ctx, Long resultOfOperation) {
+        public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
             LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
             try {
                 statusQueue.put(true);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=1367117&r1=1367116&r2=1367117&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java Mon Jul 30 15:20:55 2012
@@ -21,6 +21,7 @@ import static org.junit.Assert.*;
 
 import java.util.List;
 
+import org.apache.hedwig.protocol.PubSubProtocol;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -91,7 +92,7 @@ public class TestReadAheadCacheWhiteBox 
 
     @Test
     public void testPersistMessage() throws Exception {
-        StubCallback<Long> callback = new StubCallback<Long>();
+        StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>();
         PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null);
 
         stubPersistenceManager.failure = true;
@@ -107,7 +108,7 @@ public class TestReadAheadCacheWhiteBox 
     }
 
     private void persistMessage(Message msg) throws Exception {
-        StubCallback<Long> callback = new StubCallback<Long>();
+        StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>();
         PersistRequest request = new PersistRequest(topic, msg, callback, null);
         cacheBasedPersistenceManager.persistMessage(request);
         assertNotNull(ConcurrencyUtils.take(callback.queue).left());