You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Alexey Romanchuk (JIRA)" <ji...@apache.org> on 2016/07/17 02:05:20 UTC
[jira] [Updated] (KAFKA-3960) Committed offset not set after first
assign
[ https://issues.apache.org/jira/browse/KAFKA-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Romanchuk updated KAFKA-3960:
------------------------------------
Description:
Committed offset did not set after first assign. Here it is minimal example (scala):
{code}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "client1")
props.put("group.id", "client1")
props.put("enable.auto.commit", "false")
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
import scala.collection.JavaConversions._
def dumpPositionAndCommitted() = {
consumer.assignment().foreach { tp =>
println(tp)
println(s"Position - ${consumer.position(tp)}")
println(s"Committed - ${consumer.committed(tp)}")
}
println("-----------")
}
consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
dumpPositionAndCommitted()
Thread.sleep(3000)
val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ consumer.assignment()
consumer.assign(ps)
dumpPositionAndCommitted()
Thread.sleep(3000)
dumpPositionAndCommitted()
{code}
and the result is
{noformat}
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
-----------
topic-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
-----------
topic-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
-----------
{noformat}
Pay attention to
{noformat}
topic-1
Position - 1262864347
Committed - null
{noformat}
There is no committed offset fetched from broker, but it is. Looks like we should set {{needsFetchCommittedOffsets}} to {{true}} during assign the partition
was:
Committed offset did not set after first assign. Here it is minimal example (scala):
{code}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "client1")
props.put("group.id", "client1")
props.put("enable.auto.commit", "false")
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
import scala.collection.JavaConversions._
def dumpPositionAndCommitted() = {
consumer.assignment().foreach { tp =>
println(tp)
println(s"Position - ${consumer.position(tp)}")
println(s"Committed - ${consumer.committed(tp)}")
}
println("-----------")
}
consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
dumpPositionAndCommitted()
Thread.sleep(3000)
val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ consumer.assignment()
consumer.assign(ps)
dumpPositionAndCommitted()
Thread.sleep(3000)
dumpPositionAndCommitted()
{code}
and the result is
{noformat}
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
-----------
proto7_fraud-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
-----------
topic-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
-----------
{noformat}
Pay attention to
{noformat}
topic-1
Position - 1262864347
Committed - null
{noformat}
There is no committed offset fetched from broker, but it is. Looks like we should set {{needsFetchCommittedOffsets}} to {{true}} during assign the partition
> Committed offset not set after first assign
> -------------------------------------------
>
> Key: KAFKA-3960
> URL: https://issues.apache.org/jira/browse/KAFKA-3960
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.10.0.0
> Reporter: Alexey Romanchuk
> Priority: Blocker
>
> Committed offset did not set after first assign. Here it is minimal example (scala):
> {code}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> props.put("client.id", "client1")
> props.put("group.id", "client1")
> props.put("enable.auto.commit", "false")
> props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
> import scala.collection.JavaConversions._
> def dumpPositionAndCommitted() = {
> consumer.assignment().foreach { tp =>
> println(tp)
> println(s"Position - ${consumer.position(tp)}")
> println(s"Committed - ${consumer.committed(tp)}")
> }
> println("-----------")
> }
> consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
> dumpPositionAndCommitted()
> Thread.sleep(3000)
> val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ consumer.assignment()
> consumer.assign(ps)
> dumpPositionAndCommitted()
> Thread.sleep(3000)
> dumpPositionAndCommitted()
> {code}
> and the result is
> {noformat}
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> -----------
> topic-1
> Position - 1262864347
> Committed - null
> topic-0
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> -----------
> topic-1
> Position - 1262864347
> Committed - null
> topic-0
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> -----------
> {noformat}
> Pay attention to
> {noformat}
> topic-1
> Position - 1262864347
> Committed - null
> {noformat}
> There is no committed offset fetched from broker, but it is. Looks like we should set {{needsFetchCommittedOffsets}} to {{true}} during assign the partition
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)