You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@twill.apache.org by maochf <gi...@git.apache.org> on 2016/11/22 03:25:24 UTC

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

GitHub user maochf opened a pull request:

    https://github.com/apache/twill/pull/16

    [TWILL-199] Handle offset error and return next offset in KafkaConsumer.MessageCallback

    https://issues.apache.org/jira/browse/TWILL-199

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/maochf/twill feature/find-correct-offset

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/twill/pull/16.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16
    
----
commit 6e5dc84793a4eb35a1a0b10a36ed5a00b5275414
Author: Chengfeng <ma...@cask.co>
Date:   2016-11-22T03:04:52Z

    add KafkaOffsetProvider interface and return long in MessageCallback#onReceived

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94521381
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
               }
    -          Futures.getUnchecked(executor.submit(new Runnable() {
    +          return Futures.getUnchecked(executor.submit(new Callable<Long>() {
    +            long nextOffset = Long.MIN_VALUE;
                 @Override
    -            public void run() {
    +            public Long call() {
                   if (stopped.get()) {
    -                return;
    +                return nextOffset;
                   }
    -              callback.onReceived(messages);
    +              nextOffset = callback.onReceived(messages);
    --- End diff --
    
    Just `return callback.onReceived(messages)`. No need to store it to a local variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94688950
  
    --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
    @@ -170,11 +172,57 @@ public void testKafkaClient() throws Exception {
         Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
           .MessageCallback() {
           @Override
    -      public void onReceived(Iterator<FetchedMessage> messages) {
    +      public long onReceived(Iterator<FetchedMessage> messages, long startOffset) {
    +        long nextOffset = startOffset;
             while (messages.hasNext()) {
    -          LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
    +          FetchedMessage message = messages.next();
    +          LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
               latch.countDown();
             }
    +        return nextOffset;
    +      }
    +
    +      @Override
    +      public void finished() {
    +        stopLatch.countDown();
    +      }
    +    });
    +
    +    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
    +    cancel.cancel();
    +    Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
    +  }
    +
    +  @Test
    +  public void testKafkaClientSkipNext() throws Exception {
    +    String topic = "testClient";
    +    // Publish 30 messages with indecies the same as offsets within the range 0 - 29
    +    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
    +    t1.start();
    +    t1.join();
    +    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
    +    t2.start();
    +    t2.join();
    +    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
    +    t3.start();
    +    t3.join();
    +
    +    // 15 messages will be counted since onReceived returns `message.getNextOffset() + 1` as next offset to read
    --- End diff --
    
    I don't think the test is correct. You published 30 messages in three message set, hence the `onReceived` method will be called three times. The first time with messages 0-9 and you return 11. The second call with 11-19, and you return 20. The last call with 21-29. So in total there will be more than 15 messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94561337
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
    --- End diff --
    
    Using AtomicLong as a mutable parameter is not good. In general, immutability gives cleaner API


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94521288
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -68,7 +69,7 @@
     /**
      * A {@link KafkaConsumer} implementation using the scala kafka api.
      */
    -final class SimpleKafkaConsumer implements KafkaConsumer {
    +public final class SimpleKafkaConsumer implements KafkaConsumer {
    --- End diff --
    
    Why turn this to public? This is an internal implementation of `KafkaConsumer`, which shouldn't be public


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94558645
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -68,7 +69,7 @@
     /**
      * A {@link KafkaConsumer} implementation using the scala kafka api.
      */
    -final class SimpleKafkaConsumer implements KafkaConsumer {
    +public final class SimpleKafkaConsumer implements KafkaConsumer {
    --- End diff --
    
    Yes, it's an implementation detail. Just wondering for the future work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill issue #16: [TWILL-199] Return the offset to read next message in Kafka...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on the issue:

    https://github.com/apache/twill/pull/16
  
    LGTM. Please squash the commits to one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94555234
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -68,7 +69,7 @@
     /**
      * A {@link KafkaConsumer} implementation using the scala kafka api.
      */
    -final class SimpleKafkaConsumer implements KafkaConsumer {
    +public final class SimpleKafkaConsumer implements KafkaConsumer {
    --- End diff --
    
    Right, this can be done in the unit test. To phrase my question better, what can be a better way to perform the same logic as `getLastOffset` outside of `SimpleKafkaConsumer`? Maybe expose the parameter passed to `SimpleKafkaConsumer` constructor in `ZKKafkaClientService`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94551913
  
    --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
    @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
         Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
           .MessageCallback() {
           @Override
    -      public void onReceived(Iterator<FetchedMessage> messages) {
    +      public long onReceived(Iterator<FetchedMessage> messages) {
    +        long nextOffset = Long.MIN_VALUE;
    +        while (messages.hasNext()) {
    +          FetchedMessage message = messages.next();
    +          LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
    +          latch.countDown();
    +        }
    +        return nextOffset;
    +      }
    +
    +      @Override
    +      public void finished() {
    +        stopLatch.countDown();
    +      }
    +    });
    +
    +    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
    +    cancel.cancel();
    +    Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
    +  }
    +
    +  @Test
    +  public void testKafkaClientReadFromIdx() throws Exception {
    +    String topic = "testClient";
    +
    +    // Publish 30 messages with indecies the same as offsets within the range 0 - 29
    +    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
    +    t1.start();
    +    t1.join();
    +    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
    +    t2.start();
    +    t2.join();
    +    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
    +    t3.start();
    +    t3.join();
    +
    +    final int startIdx = 15;
    +    final CountDownLatch latch = new CountDownLatch(30 - startIdx);
    +    final CountDownLatch stopLatch = new CountDownLatch(1);
    +    final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
    +    // Creater a consumer
    +    final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer();
    +    Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
    +      .MessageCallback() {
    +      long minOffset = -2; // earliest msg
    +      long maxOffset = -1; // latest msg
    +      @Override
    +      // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset
    --- End diff --
    
    Seems like this unit-test is unnecessarily complicated. All you want to test is that the offset being returned from the `onReceived` method is being honored, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94817038
  
    --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
    @@ -189,6 +194,49 @@ public void finished() {
       }
     
       @Test
    +  public void testKafkaClientSkipNext() throws Exception {
    +    String topic = "testClient";
    +    // Publish 30 messages with indecies the same as offsets within the range 0 - 29
    +    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
    +    t1.start();
    +    t1.join();
    +    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
    +    t2.start();
    +    t2.join();
    +    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
    +    t3.start();
    +    t3.join();
    +
    +    final CountDownLatch stopLatch = new CountDownLatch(1);
    +    final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
    +    Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(
    +      new KafkaConsumer.MessageCallback() {
    +      @Override
    +      public long onReceived(long startOffset, Iterator<FetchedMessage> messages) {
    +        if (messages.hasNext()) {
    +          offsetQueue.offer(startOffset);
    +          FetchedMessage message = messages.next();
    +          LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
    +          return message.getNextOffset() + 1;
    +        }
    +        return startOffset;
    +      }
    +
    +      @Override
    +      public void finished() {
    +        stopLatch.countDown();
    +      }
    +    });
    +    // 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read
    +    for (int i = 0; i < 30; i += 2) {
    +      Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS));
    +    }
    +    Assert.assertEquals(0, offsetQueue.size());
    --- End diff --
    
    should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94918807
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    Wrap it with a peeking iterator from guava


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89195623
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    +
    +  /**
    +   * Check whether a message meets a given condition. If the condition is not met, return the next offset to
    --- End diff --
    
    Right, how about changing this to an abstract class instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94558457
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
    --- End diff --
    
    Maybe to pass an `AtomicLong` to this method, and it contains the initial offset and can be set to the next offset? Just realized this is probably an implementation detail in `SimpleKafkaConsumer` and is not general enough to be done in the interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94521832
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
               }
    -          Futures.getUnchecked(executor.submit(new Runnable() {
    +          return Futures.getUnchecked(executor.submit(new Callable<Long>() {
    +            long nextOffset = Long.MIN_VALUE;
                 @Override
    -            public void run() {
    +            public Long call() {
                   if (stopped.get()) {
    -                return;
    +                return nextOffset;
                   }
    -              callback.onReceived(messages);
    +              nextOffset = callback.onReceived(messages);
    --- End diff --
    
    Because in line 286, the stored `nextOffset` should be returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94521210
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java ---
    @@ -206,9 +206,12 @@ private LogMessageCallback(Iterable<LogHandler> logHandlers) {
         }
     
         @Override
    -    public void onReceived(Iterator<FetchedMessage> messages) {
    +    public long onReceived(Iterator<FetchedMessage> messages) {
    +      long nextOffset = Long.MIN_VALUE;
    --- End diff --
    
    Why it is min value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94874821
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    If one specified `-1L` or `-2L`, but no message is consumed in the method, then `-1L` or `-2L` still has to be returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94688058
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages, final long startOffset) {
               if (stopped.get()) {
    -            return;
    +            return startOffset;
               }
    -          Futures.getUnchecked(executor.submit(new Runnable() {
    +          return Futures.getUnchecked(executor.submit(new Callable<Long>() {
    +            long nextOffset = startOffset;
    --- End diff --
    
    You don't need this local variable. If the consumer is stopped, it always returns the `startOffset` inside the callable. Otherwise, the callable returns whatever returned by the `callback` delegate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94914909
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    Not exactly. E.g. let say the offset submitted to Kafka for fetching is "0", but the message with offset "0" is already gone, so the actual first message fetched is having a larger offset.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94551352
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -68,7 +69,7 @@
     /**
      * A {@link KafkaConsumer} implementation using the scala kafka api.
      */
    -final class SimpleKafkaConsumer implements KafkaConsumer {
    +public final class SimpleKafkaConsumer implements KafkaConsumer {
    --- End diff --
    
    That's not a good reason to turn this class to public. Why the unit test have to use this method? Can't the unit test just return some offset that it know of?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94914331
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    Yes, the current implementation follows this at line 454. Is just the caller's responsibility to honor this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94874547
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    or you change the code in a way that this statement is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94522256
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -68,7 +69,7 @@
     /**
      * A {@link KafkaConsumer} implementation using the scala kafka api.
      */
    -final class SimpleKafkaConsumer implements KafkaConsumer {
    +public final class SimpleKafkaConsumer implements KafkaConsumer {
    --- End diff --
    
    I wanted to use the `getLastOffset` method in the test. Haven't figured out a way to extract the logic in `getLastOffset` to other places without exposing the `SimpleKafkaConsumer` class because `getLastOffset` uses a lot of private members in `SimpleKafkaConsumer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by albertshau <gi...@git.apache.org>.
Github user albertshau commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r90149679
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    --- End diff --
    
    doesn't look like this interface belongs in Twill


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94691443
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the current message to be consumed.
    --- End diff --
    
    This is not exactly clear. The `startOffset` is the offset used to fetch the messages, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94521320
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
    --- End diff --
    
    Why min value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r95000005
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    Then `startOffset` is not necessary here? `onReceived` can just keep the first message's offset by itself and the caller should guarantee that no empty iterator is passed to `onReceived` as in `SimpleKafkaConsumer`. Or should we still keep `startOffset` to allow empty iterator to be passed in?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89185930
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    +
    +  /**
    +   * Check whether a message meets a given condition. If the condition is not met, return the next offset to
    --- End diff --
    
    It seems like this is not a clean contract. This is an interface, why we have to assume something passed in the constructor of the implementation class? Seem very confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94557551
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -68,7 +69,7 @@
     /**
      * A {@link KafkaConsumer} implementation using the scala kafka api.
      */
    -final class SimpleKafkaConsumer implements KafkaConsumer {
    +public final class SimpleKafkaConsumer implements KafkaConsumer {
    --- End diff --
    
    It's independent of this PR, isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94551691
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
               }
    -          Futures.getUnchecked(executor.submit(new Runnable() {
    +          return Futures.getUnchecked(executor.submit(new Callable<Long>() {
    +            long nextOffset = Long.MIN_VALUE;
                 @Override
    -            public void run() {
    +            public Long call() {
                   if (stopped.get()) {
    -                return;
    +                return nextOffset;
                   }
    -              callback.onReceived(messages);
    +              nextOffset = callback.onReceived(messages);
    --- End diff --
    
    From the current code logic, line 286 always return `Long.MIN_VALUE`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94557480
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
    --- End diff --
    
    Why? The offset passed to this method is the offset being used for fetching, hence the creation of the iterator. The offset returned, on the other hand governs the offset to use for the next fetch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94858965
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    Is this true? If someone specified `-1L` or `-2L` as the offset, would the `startOffset` here be `-1L` or `-2L` or be the actual offset (which won't be negative) of the first message in the iterator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94687750
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -35,12 +35,14 @@
          * Invoked when new messages is available.
          * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
          *                 and across different invocation.
    +     * @param startOffset Offset of the current message to be consumed.
    +     * @return The offset of the message to be fetched next.
          */
    -    void onReceived(Iterator<FetchedMessage> messages);
    +    long onReceived(Iterator<FetchedMessage> messages, long startOffset);
    --- End diff --
    
    Can you revert the ordering of the parameters? Logically it fetches from the `startOffset` to produce the `Iterator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/twill/pull/16


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89042093
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    +
    +  /**
    +   * Check whether a message meets a given condition. If the condition is not met, return the next offset to
    +   * continue searching for the message meeting this condition.
    +   * @param message {@link FetchedMessage} to check.
    +   * @return A {@code long} larger than zero as the next offset to continue searching for the message meeting the
    +   *         given condition if the current message doesn't meet the condition. Return {code 0} if the current
    +   *         message meets the given condition. Return the earliest offset {@code -2} if no message meeting the
    +   *         condition can be found.
    +   */
    +  public long getCandidateOffset(FetchedMessage message);
    --- End diff --
    
    I wanted to mean that the offset returned is the next message to be checked, but it is not guaranteed to satisfy the given condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r95203552
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    I think the `FetchedMessage` doesn't carry the "current" offset, but the next message offset, right? Meaning one can not keep consuming the same message (e.g. there is some failure that it needs to retry on the same message).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill issue #16: [TWILL-199] Return the offset to read next message in Kafka...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on the issue:

    https://github.com/apache/twill/pull/16
  
    Thank you! Just squashed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94915705
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    how can I get the offset of this first message in this case? `message.getNextOffset()--`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94521420
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -450,7 +453,10 @@ private boolean sleepIfEmpty(ByteBufferMessageSet messages) {
         private void invokeCallback(ByteBufferMessageSet messages, AtomicLong offset) {
           long savedOffset = offset.get();
           try {
    -        callback.onReceived(createFetchedMessages(messages, offset));
    +        Long nextOffset = callback.onReceived(createFetchedMessages(messages, offset));
    --- End diff --
    
    Why use `Long`? Shouldn't it be `long`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94658662
  
    --- Diff: twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java ---
    @@ -32,7 +32,7 @@
       Cancellable announce(String serviceName, int port);
     
       /**
    -   * Registers an endpoint that could be discovered by external party with a payload
    +   * Registers an endpoint that could be discovered by external party with a payload.
    --- End diff --
    
    checkstyle fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89042464
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    --- End diff --
    
    It will be implemented outside of Twill. The objects will be passed to the constructor of `MessageCallback`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94556442
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
    --- End diff --
    
    If the offset is passed to `onReceived`, it seems that there's no need to return an offset? Because now the returned offset is used to set the offset outside of the `onReceived` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89038561
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    --- End diff --
    
    How is this interface being used? I don't see any usage of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94913560
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    I understand the behavior, but is it a good API? As you see, it is quite difficult to explain, hence documenting it correctly. I feel it's much easier to explain and to use if we say the `startOffset` is the offset of the first message in the given iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94913855
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    I see. So keep the original description of `startOffset`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94561460
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
               }
    -          Futures.getUnchecked(executor.submit(new Runnable() {
    +          return Futures.getUnchecked(executor.submit(new Callable<Long>() {
    +            long nextOffset = Long.MIN_VALUE;
                 @Override
    -            public void run() {
    +            public Long call() {
                   if (stopped.get()) {
    -                return;
    +                return nextOffset;
                   }
    -              callback.onReceived(messages);
    +              nextOffset = callback.onReceived(messages);
    --- End diff --
    
    I was assuming that this `onReceived` can be called multiple times. Therefore, at line 286 when stopped is set to true, messages processed in previous calls of `onReceived` should be skipped.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by albertshau <gi...@git.apache.org>.
Github user albertshau commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r90152039
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -35,8 +35,10 @@
          * Invoked when new messages is available.
          * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
          *                 and across different invocation.
    +     * @return A long larger than zero as the offset to restart fetching messages when offset error is caught,
    --- End diff --
    
    what happens if the offset returned is out of bounds? Should document the expected behavior in such circumstances. Looking at the implementation, it seems like we read from the earliest offset. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by albertshau <gi...@git.apache.org>.
Github user albertshau commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r90151909
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -35,8 +35,10 @@
          * Invoked when new messages is available.
          * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
          *                 and across different invocation.
    +     * @return A long larger than zero as the offset to restart fetching messages when offset error is caught,
    +     *         {@code -1} if the error cannot be resolved. Returns {@code 0} if no need to restart fetching.
          */
    -    void onReceived(Iterator<FetchedMessage> messages);
    +    long onReceived(Iterator<FetchedMessage> messages);
    --- End diff --
    
    -1 for latest and -2 for earliest seems to make sense. 0 is a valid kafka offset, though I suppose practically speaking, any use of 0 could be replaced by -2.
    
    It is unclear what an 'error' means. Does it mean the consumer should stop consuming? I think we can just leave that out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89038384
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -35,8 +35,10 @@
          * Invoked when new messages is available.
          * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
          *                 and across different invocation.
    +     * @return A long larger than zero as the offset to restart fetching messages when offset error is caught,
    +     *         {@code -1} if the error cannot be resolved. Returns {@code 0} if no need to restart fetching.
          */
    -    void onReceived(Iterator<FetchedMessage> messages);
    +    long onReceived(Iterator<FetchedMessage> messages);
    --- End diff --
    
    How to indicate to restart from earliest or latest?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94861374
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    How about "The default offset to return as the offset to fetch next message if no message is consumed in this method"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94522513
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
    --- End diff --
    
    In this case, no message is processed, so the offset should remain unchanged to start reading from the current offset again next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94911854
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    Is it? Or shall it return the offset of the first message being read? Other, e.g. if `-1L` was specified, and the latest message was read, but not being consumed by the `onReceived` method. If it returns `-1L`, then that particular message won't get fetched again if there is a newer message available. Is that the intention?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94551508
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---
    @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
           final AtomicBoolean stopped = new AtomicBoolean();
           return new MessageCallback() {
             @Override
    -        public void onReceived(final Iterator<FetchedMessage> messages) {
    +        public long onReceived(final Iterator<FetchedMessage> messages) {
               if (stopped.get()) {
    -            return;
    +            return Long.MIN_VALUE;
    --- End diff --
    
    Using special value is an anti-pattern. Since you are changing the `MessageCallback` API already, probably better to have the offset being used for the fetching call being passed to the `onReceived` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94551779
  
    --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
    @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
         Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
           .MessageCallback() {
           @Override
    -      public void onReceived(Iterator<FetchedMessage> messages) {
    +      public long onReceived(Iterator<FetchedMessage> messages) {
    +        long nextOffset = Long.MIN_VALUE;
    +        while (messages.hasNext()) {
    +          FetchedMessage message = messages.next();
    +          LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
    +          latch.countDown();
    +        }
    +        return nextOffset;
    +      }
    +
    +      @Override
    +      public void finished() {
    +        stopLatch.countDown();
    +      }
    +    });
    +
    +    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
    +    cancel.cancel();
    +    Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
    +  }
    +
    +  @Test
    +  public void testKafkaClientReadFromIdx() throws Exception {
    +    String topic = "testClient";
    +
    +    // Publish 30 messages with indecies the same as offsets within the range 0 - 29
    +    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
    +    t1.start();
    +    t1.join();
    +    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
    +    t2.start();
    +    t2.join();
    +    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
    +    t3.start();
    +    t3.join();
    +
    +    final int startIdx = 15;
    +    final CountDownLatch latch = new CountDownLatch(30 - startIdx);
    +    final CountDownLatch stopLatch = new CountDownLatch(1);
    +    final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
    +    // Creater a consumer
    +    final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer();
    +    Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
    +      .MessageCallback() {
    +      long minOffset = -2; // earliest msg
    +      long maxOffset = -1; // latest msg
    +      @Override
    +      // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset
    +      // to fetch message until the matching message is found.
    +      public long onReceived(Iterator<FetchedMessage> messages) {
    +        while (messages.hasNext()) {
    +          FetchedMessage currentMsg = messages.next();
    +          long currentOffset = currentMsg.getNextOffset() - 1;
    +          String decodedMsg = Charsets.UTF_8.decode(currentMsg.getPayload()).toString();
    +          LOG.info(decodedMsg);
    +          int currentIdx = Integer.valueOf(decodedMsg.split(" ")[0]);
    +          LOG.info("Current offset = {}, currentIdx = {}. minOffset = {}", currentOffset, currentIdx, minOffset);
    +          if (currentIdx == startIdx) {
    +            if (offsetQueue.size() == 0) {
    +              offsetQueue.offer(currentOffset);
    +              LOG.info("currentOffset = {} matches startIdx {}", currentOffset, startIdx);
    +            }
    +            return currentOffset;
    +          }
    +          // If minOffset and maxOffset still have their initial values, set the minOffset to currentOffset and return
    +          // the offset of the last received message
    +          if (minOffset == -2 && maxOffset == -1) {
    +            minOffset = currentOffset;
    +            LOG.info("minOffset = {}, return maxOffset = {}", minOffset, maxOffset);
    +            // Returns the offset of the last received messages. Cannot return -1 because -1 will be translated as
    +            // the next offset after the last received message
    +            return consumer.getLastOffset(currentMsg.getTopicPartition(), -1) - 1;
    +          }
    +          if (maxOffset == -1) {
    +            maxOffset = currentOffset;
    +          }
    +          LOG.info("minOffset = {}, maxOffset = {}", minOffset, maxOffset);
    +          // If minOffset > maxOffset, the startIdx cannot be found in the current range of offset.
    +          // Restore the initial values of minOffset and maxOffset and read from the beginning again
    +          if (minOffset > maxOffset) {
    +            minOffset = -2;
    +            maxOffset = -1;
    +            LOG.info("minOffset > maxOffset, return minOffset = {}", minOffset);
    +            return minOffset;
    +          }
    +          if (currentIdx > startIdx) {
    +            maxOffset = currentOffset - 1;
    +            long newOffset = minOffset + (maxOffset - minOffset)/2;
    +            LOG.info("currentIdx > startIdx, return newOffset {}", newOffset);
    +            return newOffset;
    +          }
    +          if (currentIdx < startIdx) {
    +            minOffset = currentOffset + 1;
    +            long newOffset = minOffset + (maxOffset - minOffset)/2;
    +            LOG.info("currentIdx < startIdx, return newOffset {}", newOffset);
    +            return newOffset;
    +          }
    +        }
    +        return Long.MIN_VALUE;
    +      }
    +
    +      @Override
    +      public void finished() {
    +        //no-op
    +      }
    +    });
    +
    +    long startOffset = offsetQueue.poll(360, TimeUnit.SECONDS);
    +    initCancel.cancel();
    +
    +    Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, startOffset).consume(new KafkaConsumer
    +      .MessageCallback() {
    --- End diff --
    
    This is an awkward line break.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89038468
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    +
    +  /**
    +   * Check whether a message meets a given condition. If the condition is not met, return the next offset to
    --- End diff --
    
    I don't see any condition. How is the condition provided?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r89042335
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.kafka.client;
    +
    +/**
    + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
    + * not met, the method will return the next offset to continue searching for the message meeting this condition.
    + */
    +public interface KafkaOffsetProvider {
    +
    +  /**
    +   * Check whether a message meets a given condition. If the condition is not met, return the next offset to
    --- End diff --
    
    It will be provided in the constructor. For instance, the timestamp will be passed as a parameter in the constructor. Other parameters can also be passed instead, and they can be used in `getCandidateOffset` method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill issue #16: [TWILL-199] Handle offset error and return next offset in K...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on the issue:

    https://github.com/apache/twill/pull/16
  
    There are checkstyle failure, please fix them as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94914034
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    The description yes, but I think you need to change the implementation to honor it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94559673
  
    --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
    @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
         Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
           .MessageCallback() {
           @Override
    -      public void onReceived(Iterator<FetchedMessage> messages) {
    +      public long onReceived(Iterator<FetchedMessage> messages) {
    +        long nextOffset = Long.MIN_VALUE;
    +        while (messages.hasNext()) {
    +          FetchedMessage message = messages.next();
    +          LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
    +          latch.countDown();
    +        }
    +        return nextOffset;
    +      }
    +
    +      @Override
    +      public void finished() {
    +        stopLatch.countDown();
    +      }
    +    });
    +
    +    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
    +    cancel.cancel();
    +    Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
    +  }
    +
    +  @Test
    +  public void testKafkaClientReadFromIdx() throws Exception {
    +    String topic = "testClient";
    +
    +    // Publish 30 messages with indecies the same as offsets within the range 0 - 29
    +    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
    +    t1.start();
    +    t1.join();
    +    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
    +    t2.start();
    +    t2.join();
    +    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
    +    t3.start();
    +    t3.join();
    +
    +    final int startIdx = 15;
    +    final CountDownLatch latch = new CountDownLatch(30 - startIdx);
    +    final CountDownLatch stopLatch = new CountDownLatch(1);
    +    final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
    +    // Creater a consumer
    +    final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer();
    +    Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
    +      .MessageCallback() {
    +      long minOffset = -2; // earliest msg
    +      long maxOffset = -1; // latest msg
    +      @Override
    +      // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset
    --- End diff --
    
    Right, since the offsets and indices of the messages are known, it can directly jump to the desired idx without binary search.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

Posted by maochf <gi...@git.apache.org>.
Github user maochf commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94912977
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -33,14 +33,16 @@
     
         /**
          * Invoked when new messages is available.
    +     * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages.
    --- End diff --
    
    Yes, I think one should be able to use `-1L` to fetch an upcoming new message, which also aligns with the usage of `-1L` in Kafka's `SimpleConsumer`. If the latest message is read, and it's the first message in the iterator, one can just specify `startOffset` as the latest message offset if one doesn't want to skip it. If the latest message is not the first message in the iterator and its previous message was consumed, `onReceived` will still return the latest message offset.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94521515
  
    --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
    @@ -35,8 +35,10 @@
          * Invoked when new messages is available.
          * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
          *                 and across different invocation.
    +     * @return The offset of the message to be fetched next. Returns {@code Long.MIN_VALUE} to keep the current offset
    --- End diff --
    
    In what case is unchange?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---