You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Bowden, Chris" <ch...@hpe.com> on 2017/03/07 21:52:49 UTC
Structured Streaming - Kafka
Potential bug when using startingOffsets = SpecificOffsets with Kafka topics containing uppercase characters?
KafkaSourceProvider#L80/86:
val startingOffsets =
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("latest") => LatestOffsets
case Some("earliest") => EarliestOffsets
case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
case None => LatestOffsets
}
Topics in JSON get lowered so underlying assignments in the consumer are incorrect, and the assertion in KafkaSource#L326 triggers:
private def fetchSpecificStartingOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
val result = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
Re: Structured Streaming - Kafka
Posted by "Bowden, Chris" <ch...@hpe.com>.
https://issues.apache.org/jira/browse/SPARK-19853, pr by eow
________________________________
From: Shixiong(Ryan) Zhu <sh...@databricks.com>
Sent: Tuesday, March 7, 2017 2:04:45 PM
To: Bowden, Chris
Cc: user; Gudenkauf, Jack
Subject: Re: Structured Streaming - Kafka
Good catch. Could you create a ticket? You can also submit a PR to fix it if you have time :)
On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris <ch...@hpe.com>> wrote:
Potential bug when using startingOffsets = SpecificOffsets with Kafka topics containing uppercase characters?
KafkaSourceProvider#L80/86:
val startingOffsets =
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("latest") => LatestOffsets
case Some("earliest") => EarliestOffsets
case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
case None => LatestOffsets
}
Topics in JSON get lowered so underlying assignments in the consumer are incorrect, and the assertion in KafkaSource#L326 triggers:
private def fetchSpecificStartingOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
val result = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
Re: Structured Streaming - Kafka
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Good catch. Could you create a ticket? You can also submit a PR to fix it
if you have time :)
On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris <ch...@hpe.com> wrote:
> Potential bug when using startingOffsets = SpecificOffsets with Kafka
> topics containing uppercase characters?
>
> KafkaSourceProvider#L80/86:
>
> val startingOffsets =
> caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
> case Some("latest") => LatestOffsets
> case Some("earliest") => EarliestOffsets
> case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
> case None => LatestOffsets
> }
>
> Topics in JSON get lowered so underlying assignments in the consumer are
> incorrect, and the assertion in KafkaSource#L326 triggers:
>
> private def fetchSpecificStartingOffsets(
> partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
> val result = withRetriesWithoutInterrupt {
> // Poll to get the latest assigned partitions
> consumer.poll(0)
> val partitions = consumer.assignment()
> consumer.pause(partitions)
> assert(partitions.asScala == partitionOffsets.keySet,
> "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
> "Use -1 for latest, -2 for earliest, if you don't care.\n" +
> s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
>
>