You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joe San <co...@gmail.com> on 2016/02/10 23:30:31 UTC
Kafka 0.8.2 Consumer Poll Mechanism
I tried to mimic the poll method that we have with the new consumer API in
the 0.9.0.0 version. Here is what I have:
def readFromKafka() = {
val streams = consumerStreamsMap.get(consumerConfig.topic)
@tailrec
def poll(pollConfig: PollConfig, messages: Seq[String]): Seq[String] = {
val iterator = streams.iterator()
val isTimeLapsed =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) >
pollConfig.pollTimeout
if (iterator.hasNext && isTimeLapsed) {
val newMessages = iterator.next.asScala.toSeq.map {
case msg => new String(msg.message())
}
poll(pollConfig, messages ++ newMessages)
} else {
messages
}
}
val nowSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
val messages = poll(PollConfig(nowSeconds + 4), Seq.empty[String])
// TODO: re-work after the entire code works!
consumer.commitOffsets()
toTsDataPointSeq(messages).flatten
}
Is this good enough? Am I doing it right? Am I committing the right Offset?
Re: Kafka 0.8.2 Consumer Poll Mechanism
Posted by Joe San <co...@gmail.com>.
any takers for my question? I will anyways try it out today!
On Wed, Feb 10, 2016 at 11:32 PM, Joe San <co...@gmail.com> wrote:
> So basically what I'm doing is the following:
>
> 1. I'm checking if the time to read the stream has lapsed. If yes, then I
> come out of the recursion.
> 2. If not, I read the stream by getting the next element in the iterator
>
> What will be the offset that will be committed when I do
> consumer.commitOffsets()?
>
> On Wed, Feb 10, 2016 at 11:30 PM, Joe San <co...@gmail.com> wrote:
>
>> I tried to mimic the poll method that we have with the new consumer API
>> in the 0.9.0.0 version. Here is what I have:
>>
>> def readFromKafka() = {
>> val streams = consumerStreamsMap.get(consumerConfig.topic)
>>
>> @tailrec
>> def poll(pollConfig: PollConfig, messages: Seq[String]): Seq[String] = {
>> val iterator = streams.iterator()
>> val isTimeLapsed = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) > pollConfig.pollTimeout
>> if (iterator.hasNext && isTimeLapsed) {
>> val newMessages = iterator.next.asScala.toSeq.map {
>> case msg => new String(msg.message())
>> }
>> poll(pollConfig, messages ++ newMessages)
>> } else {
>> messages
>> }
>> }
>>
>> val nowSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
>> val messages = poll(PollConfig(nowSeconds + 4), Seq.empty[String])
>>
>> // TODO: re-work after the entire code works!
>> consumer.commitOffsets()
>> toTsDataPointSeq(messages).flatten
>> }
>>
>>
>> Is this good enough? Am I doing it right? Am I committing the right
>> Offset?
>>
>
>
Re: Kafka 0.8.2 Consumer Poll Mechanism
Posted by Joe San <co...@gmail.com>.
So basically what I'm doing is the following:
1. I'm checking if the time to read the stream has lapsed. If yes, then I
come out of the recursion.
2. If not, I read the stream by getting the next element in the iterator
What will be the offset that will be committed when I do
consumer.commitOffsets()?
On Wed, Feb 10, 2016 at 11:30 PM, Joe San <co...@gmail.com> wrote:
> I tried to mimic the poll method that we have with the new consumer API in
> the 0.9.0.0 version. Here is what I have:
>
> def readFromKafka() = {
> val streams = consumerStreamsMap.get(consumerConfig.topic)
>
> @tailrec
> def poll(pollConfig: PollConfig, messages: Seq[String]): Seq[String] = {
> val iterator = streams.iterator()
> val isTimeLapsed = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) > pollConfig.pollTimeout
> if (iterator.hasNext && isTimeLapsed) {
> val newMessages = iterator.next.asScala.toSeq.map {
> case msg => new String(msg.message())
> }
> poll(pollConfig, messages ++ newMessages)
> } else {
> messages
> }
> }
>
> val nowSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
> val messages = poll(PollConfig(nowSeconds + 4), Seq.empty[String])
>
> // TODO: re-work after the entire code works!
> consumer.commitOffsets()
> toTsDataPointSeq(messages).flatten
> }
>
>
> Is this good enough? Am I doing it right? Am I committing the right Offset?
>