You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joe San <co...@gmail.com> on 2016/02/10 23:30:31 UTC

Kafka 0.8.2 Consumer Poll Mechanism

I tried to mimic the poll method that we have with the new consumer API in
the 0.9.0.0 version. Here is what I have:

def readFromKafka() = {
  val streams = consumerStreamsMap.get(consumerConfig.topic)

  @tailrec
  def poll(pollConfig: PollConfig, messages: Seq[String]): Seq[String] = {
    val iterator = streams.iterator()
    val isTimeLapsed =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) >
pollConfig.pollTimeout
    if (iterator.hasNext && isTimeLapsed) {
      val newMessages = iterator.next.asScala.toSeq.map {
        case msg => new String(msg.message())
      }
      poll(pollConfig, messages ++ newMessages)
    } else {
      messages
    }
  }

  val nowSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
  val messages = poll(PollConfig(nowSeconds + 4), Seq.empty[String])

  // TODO: re-work after the entire code works!
  consumer.commitOffsets()
  toTsDataPointSeq(messages).flatten
}


Is this good enough? Am I doing it right? Am I committing the right Offset?

Re: Kafka 0.8.2 Consumer Poll Mechanism

Posted by Joe San <co...@gmail.com>.
any takers for my question? I will anyways try it out today!

On Wed, Feb 10, 2016 at 11:32 PM, Joe San <co...@gmail.com> wrote:

> So basically what I'm doing is the following:
>
> 1. I'm checking if the time to read the stream has lapsed. If yes, then I
> come out of the recursion.
> 2. If not, I read the stream by getting the next element in the iterator
>
> What will be the offset that will be committed  when I do
> consumer.commitOffsets()?
>
> On Wed, Feb 10, 2016 at 11:30 PM, Joe San <co...@gmail.com> wrote:
>
>> I tried to mimic the poll method that we have with the new consumer API
>> in the 0.9.0.0 version. Here is what I have:
>>
>> def readFromKafka() = {
>>   val streams = consumerStreamsMap.get(consumerConfig.topic)
>>
>>   @tailrec
>>   def poll(pollConfig: PollConfig, messages: Seq[String]): Seq[String] = {
>>     val iterator = streams.iterator()
>>     val isTimeLapsed = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) > pollConfig.pollTimeout
>>     if (iterator.hasNext && isTimeLapsed) {
>>       val newMessages = iterator.next.asScala.toSeq.map {
>>         case msg => new String(msg.message())
>>       }
>>       poll(pollConfig, messages ++ newMessages)
>>     } else {
>>       messages
>>     }
>>   }
>>
>>   val nowSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
>>   val messages = poll(PollConfig(nowSeconds + 4), Seq.empty[String])
>>
>>   // TODO: re-work after the entire code works!
>>   consumer.commitOffsets()
>>   toTsDataPointSeq(messages).flatten
>> }
>>
>>
>> Is this good enough? Am I doing it right? Am I committing the right
>> Offset?
>>
>
>

Re: Kafka 0.8.2 Consumer Poll Mechanism

Posted by Joe San <co...@gmail.com>.
So basically what I'm doing is the following:

1. I'm checking if the time to read the stream has lapsed. If yes, then I
come out of the recursion.
2. If not, I read the stream by getting the next element in the iterator

What will be the offset that will be committed  when I do
consumer.commitOffsets()?

On Wed, Feb 10, 2016 at 11:30 PM, Joe San <co...@gmail.com> wrote:

> I tried to mimic the poll method that we have with the new consumer API in
> the 0.9.0.0 version. Here is what I have:
>
> def readFromKafka() = {
>   val streams = consumerStreamsMap.get(consumerConfig.topic)
>
>   @tailrec
>   def poll(pollConfig: PollConfig, messages: Seq[String]): Seq[String] = {
>     val iterator = streams.iterator()
>     val isTimeLapsed = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) > pollConfig.pollTimeout
>     if (iterator.hasNext && isTimeLapsed) {
>       val newMessages = iterator.next.asScala.toSeq.map {
>         case msg => new String(msg.message())
>       }
>       poll(pollConfig, messages ++ newMessages)
>     } else {
>       messages
>     }
>   }
>
>   val nowSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
>   val messages = poll(PollConfig(nowSeconds + 4), Seq.empty[String])
>
>   // TODO: re-work after the entire code works!
>   consumer.commitOffsets()
>   toTsDataPointSeq(messages).flatten
> }
>
>
> Is this good enough? Am I doing it right? Am I committing the right Offset?
>