You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/01/10 19:34:15 UTC

twill git commit: Return the offset to read next message in KafkaConsumer.MessageCallback#onReceive

Repository: twill
Updated Branches:
  refs/heads/master 5986553ba -> b28e59eaa


Return the offset to read next message in KafkaConsumer.MessageCallback#onReceive

This closes #16 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/b28e59ea
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/b28e59ea
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/b28e59ea

Branch: refs/heads/master
Commit: b28e59eaabb45cf201d5ea15fafee25079dec795
Parents: 5986553
Author: Chengfeng <ma...@cask.co>
Authored: Mon Nov 21 19:04:52 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 10 11:33:17 2017 -0800

----------------------------------------------------------------------
 .../org/apache/twill/api/ServiceAnnouncer.java  |  2 +-
 .../twill/internal/AbstractTwillController.java |  8 +-
 .../kafka/client/BasicFetchedMessage.java       | 10 +++
 .../kafka/client/SimpleKafkaConsumer.java       | 19 ++---
 .../twill/kafka/client/FetchedMessage.java      |  5 ++
 .../twill/kafka/client/KafkaConsumer.java       |  3 +-
 .../apache/twill/kafka/client/KafkaTest.java    | 82 ++++++++++++++++----
 7 files changed, 103 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java b/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
index 9513352..2721388 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
@@ -32,7 +32,7 @@ public interface ServiceAnnouncer {
   Cancellable announce(String serviceName, int port);
 
   /**
-   * Registers an endpoint that could be discovered by external party with a payload
+   * Registers an endpoint that could be discovered by external party with a payload.
    * @param serviceName Name of the endpoint
    * @param port Port of the endpoint
    * @param payload byte array payload

http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 5a1c5b3..212411f 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -229,9 +229,11 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
     }
 
     @Override
-    public void onReceived(Iterator<FetchedMessage> messages) {
+    public long onReceived(Iterator<FetchedMessage> messages) {
+      long nextOffset = -1L;
       while (messages.hasNext()) {
-        String json = Charsets.UTF_8.decode(messages.next().getPayload()).toString();
+        FetchedMessage message = messages.next();
+        String json = Charsets.UTF_8.decode(message.getPayload()).toString();
         try {
           LogEntry entry = GSON.fromJson(json, LogEntry.class);
           if (entry != null) {
@@ -240,7 +242,9 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
         } catch (Exception e) {
           LOG.error("Failed to decode log entry {}", json, e);
         }
+        nextOffset = message.getNextOffset();
       }
+      return nextOffset;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
index ee53ed4..bf5d264 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
@@ -29,6 +29,7 @@ final class BasicFetchedMessage implements FetchedMessage {
 
   private final TopicPartition topicPartition;
   private ByteBuffer payload;
+  private long offset;
   private long nextOffset;
 
   BasicFetchedMessage(TopicPartition topicPartition) {
@@ -39,6 +40,10 @@ final class BasicFetchedMessage implements FetchedMessage {
     this.payload = payload;
   }
 
+  void setOffset(long offset) {
+    this.offset = offset;
+  }
+
   void setNextOffset(long nextOffset) {
     this.nextOffset = nextOffset;
   }
@@ -54,6 +59,11 @@ final class BasicFetchedMessage implements FetchedMessage {
   }
 
   @Override
+  public long getOffset() {
+    return offset;
+  }
+
+  @Override
   public long getNextOffset() {
     return nextOffset;
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 0299e56..927788f 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -57,6 +57,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -273,17 +274,17 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
       final AtomicBoolean stopped = new AtomicBoolean();
       return new MessageCallback() {
         @Override
-        public void onReceived(final Iterator<FetchedMessage> messages) {
+        public long onReceived(final Iterator<FetchedMessage> messages) {
           if (stopped.get()) {
-            return;
+            return -1L;
           }
-          Futures.getUnchecked(executor.submit(new Runnable() {
+          return Futures.getUnchecked(executor.submit(new Callable<Long>() {
             @Override
-            public void run() {
+            public Long call() {
               if (stopped.get()) {
-                return;
+                return -1L;
               }
-              callback.onReceived(messages);
+              return callback.onReceived(messages);
             }
           }));
         }
@@ -450,7 +451,7 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
     private void invokeCallback(ByteBufferMessageSet messages, AtomicLong offset) {
       long savedOffset = offset.get();
       try {
-        callback.onReceived(createFetchedMessages(messages, offset));
+        offset.set(callback.onReceived(createFetchedMessages(messages, offset)));
       } catch (Throwable t) {
         LOG.error("Callback throws exception. Retry from offset {} for {}", startOffset, topicPart, t);
         offset.set(savedOffset);
@@ -475,9 +476,9 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
               continue;
             }
 
-            offset.set(message.nextOffset());
             fetchedMessage.setPayload(message.message().payload());
-            fetchedMessage.setNextOffset(offset.get());
+            fetchedMessage.setOffset(message.offset());
+            fetchedMessage.setNextOffset(message.nextOffset());
 
             return fetchedMessage;
           }

http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
index 5739ac6..49dc291 100644
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
@@ -29,6 +29,11 @@ public interface FetchedMessage {
   ByteBuffer getPayload();
 
   /**
+   * Returns the offset for the current message.
+   */
+  long getOffset();
+
+  /**
    * Returns the offset for the next message to be read.
    */
   long getNextOffset();

http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
index d53ee98..28bcd80 100644
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
@@ -35,8 +35,9 @@ public interface KafkaConsumer {
      * Invoked when new messages is available.
      * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
      *                 and across different invocation.
+     * @return The offset of the message to be fetched next.
      */
-    void onReceived(Iterator<FetchedMessage> messages);
+    long onReceived(Iterator<FetchedMessage> messages);
 
     /**
      * Invoked when message consumption is stopped. When this method is invoked,

http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 4ac8ae4..958925c 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -42,10 +42,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -89,8 +87,8 @@ public class KafkaTest {
   @Test
   public void testKafkaClientReconnect() throws Exception {
     String topic = "backoff";
-    Properties kafkServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
-    EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkServerConfig);
+    Properties kafkaServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
+    EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkaServerConfig);
 
     ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build();
     zkClient.startAndWait();
@@ -106,15 +104,19 @@ public class KafkaTest {
           // Publish a messages
           createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start();
 
-          // Creater a consumer
+          // Create a consumer
           final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
           Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0)
             .consume(new KafkaConsumer.MessageCallback() {
               @Override
-              public void onReceived(Iterator<FetchedMessage> messages) {
+              public long onReceived(Iterator<FetchedMessage> messages) {
+                long nextOffset = -1L;
                 while (messages.hasNext()) {
-                  queue.offer(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+                  FetchedMessage message = messages.next();
+                  nextOffset = message.getNextOffset();
+                  queue.offer(Charsets.UTF_8.decode(message.getPayload()).toString());
                 }
+                return nextOffset;
               }
 
               @Override
@@ -130,7 +132,7 @@ public class KafkaTest {
 
           // Start the server again.
           // Needs to create a new instance with the same config since guava service cannot be restarted
-          server = new EmbeddedKafkaServer(kafkServerConfig);
+          server = new EmbeddedKafkaServer(kafkaServerConfig);
           server.startAndWait();
 
           // Publish another message
@@ -170,11 +172,15 @@ public class KafkaTest {
     Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
       .MessageCallback() {
       @Override
-      public void onReceived(Iterator<FetchedMessage> messages) {
+      public long onReceived(Iterator<FetchedMessage> messages) {
+        long nextOffset = -1;
         while (messages.hasNext()) {
-          LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+          FetchedMessage message = messages.next();
+          nextOffset = message.getNextOffset();
+          LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
           latch.countDown();
         }
+        return nextOffset;
       }
 
       @Override
@@ -189,6 +195,51 @@ public class KafkaTest {
   }
 
   @Test
+  public void testKafkaClientSkipNext() throws Exception {
+    String topic = "testClientSkipNext";
+    // Publish 30 messages with indecies the same as offsets within the range 0 - 29
+    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
+    t1.start();
+    t1.join();
+    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
+    t2.start();
+    t2.join();
+    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
+    t3.start();
+    t3.join();
+
+    final CountDownLatch stopLatch = new CountDownLatch(1);
+    final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
+    Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(
+      new KafkaConsumer.MessageCallback() {
+      @Override
+      public long onReceived(Iterator<FetchedMessage> messages) {
+        long nextOffset = -1L;
+        if (messages.hasNext()) {
+          FetchedMessage message = messages.next();
+          nextOffset = message.getNextOffset() + 1;
+          offsetQueue.offer(message.getOffset());
+          LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+          return nextOffset;
+        }
+        return nextOffset;
+      }
+
+      @Override
+      public void finished() {
+        stopLatch.countDown();
+      }
+    });
+    // 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read
+    for (long i = 0; i < 30; i += 2) {
+      Assert.assertEquals(i, (long) offsetQueue.poll(60, TimeUnit.SECONDS));
+    }
+    Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS));
+    cancel.cancel();
+    Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+  }
+
+  @Test
   public void testBrokerChange() throws Exception {
     // Create a new namespace in ZK for Kafka server for this test case
     String connectionStr = zkServer.getConnectionStr() + "/broker_change";
@@ -207,13 +258,17 @@ public class KafkaTest {
 
     // Attach a consumer
     final BlockingQueue<String> consumedMessages = Queues.newLinkedBlockingQueue();
-    Cancellable cancelConsumer = kafkaClient.getConsumer()
+    kafkaClient.getConsumer()
       .prepare().addFromBeginning("test", 0).consume(new KafkaConsumer.MessageCallback() {
       @Override
-      public void onReceived(Iterator<FetchedMessage> messages) {
+      public long onReceived(Iterator<FetchedMessage> messages) {
+        long nextOffset = -1L;
         while (messages.hasNext()) {
-          consumedMessages.add(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+          FetchedMessage message = messages.next();
+          nextOffset = message.getNextOffset();
+          consumedMessages.add(Charsets.UTF_8.decode(message.getPayload()).toString());
         }
+        return nextOffset;
       }
 
       @Override
@@ -286,6 +341,7 @@ public class KafkaTest {
     prop.setProperty("log.flush.interval.messages", "10000");
     prop.setProperty("log.flush.interval.ms", "1000");
     prop.setProperty("log.segment.bytes", "536870912");
+    prop.setProperty("message.send.max.retries", "10");
     prop.setProperty("zookeeper.connect", zkConnectStr);
     prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
     prop.setProperty("default.replication.factor", "1");