You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2022/03/28 06:36:00 UTC

[jira] [Commented] (SPARK-38672) Kafka, spark data frame read fails when a message does not exist for the timestamp in all partitions

    [ https://issues.apache.org/jira/browse/SPARK-38672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17513175#comment-17513175 ] 

Jungtaek Lim commented on SPARK-38672:
--------------------------------------

Could you please add the option "startingOffsetsByTimestampStrategy" to "latest" in Kafka data source? See [SPARK-35611|https://issues.apache.org/jira/browse/SPARK-35611].

I'll close this for now. Please reopen if the option does not resolve the issue.

> Kafka, spark data frame read fails when a message does not exist for the timestamp in all partitions
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38672
>                 URL: https://issues.apache.org/jira/browse/SPARK-38672
>             Project: Spark
>          Issue Type: Bug
>          Components: Build
>    Affects Versions: 3.2.1
>            Reporter: gattu shivakumar
>            Priority: Major
>
> *Issue:*
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at org.apache.spark.util.UninterruptibleThreadRunner$$anon$1.run(UninterruptibleThreadRunner.scala:33)
> Caused by: java.lang.AssertionError: *No offset matched from request of topic-partition source_JSON_sgattu_primittive-5 and timestamp 1648013731709.*
> at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> To repro the issue, produce few messages to kafka and then try to fetch using below program (Used scala shell)
> bin/kafka-console-producer.sh --bootstrap-server pkc-epwny.eastus.azure.confluent.cloud:9092 --topic source_JSON_sgattu_primittive --producer.config kafkabroker.properties
> {quote}{"ordertime":1497014222380,"orderid":18,"itemid":"Item_184","BinaryField":true}
> {"ordertime":1497014222399,"orderid":19,"itemid":"Item_185","BinaryField":false}
> {quote}
> import org.apache.spark.sql.functions._
> import spark.implicits._
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> val kafkaServer: String = "XXX:9092"
> val topicSampleName: String = "t2"
> val columns=Array("id", "value", "last", "year", "binary", "intbinary")
> val df=sc.parallelize(Seq(
> (1, "John", "Doe", 1986, true, 1),
> (2, "Ive", "Fish", 1990, false, 0),
> (4, "John", "Wayne", 1995, false, 1)
> )).toDF(columns: _*)
> spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaServer).option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='XXX' password='XXX';")
> .option("kafka.sasl.mechanism", "PLAIN")
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.bootstrap.servers", "XXX:9092")
> .option("subscribe", "kafkaTopic")
> .option("startingOffsetsByTimestamp", """\{"source_JSON_sgattu_primittive":{"0":1648013731709,"5":1648013731709,"4":1648013731709,"1":1648013731709,"2":1648013731709,"3":1648013731709}}""")
> .load()



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org