You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Rathore, Yashasvini" <ra...@optum.com.INVALID> on 2021/02/18 11:01:41 UTC

[Spark SQL] - Not able to consume Kafka topics

Hello,

Issues :

  *   I and my team are trying to consume some kafka topics based on the timestamps using startingOffsetsByTimestamps option, and the code works fine when we run via a Databricks notebook.
  *   There is a need to setup the whole process in a local system (IntelliJ), but the same code doesn’t work there. We are referring the official documentation page, and using the exact same syntax and the same versions as mentioned but somehow the code fails on the startingOffsetsByTimestamps line.
  *   The following versions are being used:

  *   Scala : 2.12.12
  *   Spark-sql : 3.0.1
  *   Spark-sql-kafka-0-10

  *   The code snippet is as follows, please suggest any changes or details that we can use to fix this :

         val spark = SparkSession.builder().appName("Automation").master("local[*]").getOrCreate()

    val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("subscribe", topic)
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", truststore_location)
      .option("kafka.ssl.truststore.password", truststore_pwd)
      .option("kafka.ssl.keystore.location", keystore_location)
      .option("kafka.ssl.keystore.password", keystore_pwd)
      .option("kafka.ssl.key.password", key_pwd)
      .option("kafka.schema.registry.url", url)
      .option("kafka.request.timeout.ms", "60000")
      .option("kafka.session.timeout.ms", "60000")
      .option("maxOffsetsPerTrigger", 3000000)
      .option("failOnDataLoss", "false")
      .option("dropMalformed", "true")
      .option("startingOffsetsByTimestamp", """{​​​​​​​"topic": {​​​​​​​"0": ​​​​​1000}​​​​​​​}​​​​​​​""")
      .load()

      df.show()

Error produced :
Exception in thread "main" java.lang.IllegalArgumentException: Expected e.g. {"topicA": {"0": 123456789, "1": 123456789},
"topicB": {"0": 123456789, "1": 123456789}}


Expecations :

  *   I believe someone from kafka/spark team would help me resolve this issue, so that I can proceed further with my work.


--
Thanks & Regards,
Yashasvini Rathore
Assoc Software Engineer II, Hyderabad, India
(Desk) +91 403/968-5738
Our United Culture. The way forward.
■ Integrity ■ Compassion ■ Relationships ■ Innovation ■ Performance




This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

[Spark SQL] - Not able to consume Kafka topics

Posted by "Rathore, Yashasvini" <ra...@optum.com.INVALID>.
Hello,

Issues :

  *   I and my team are trying to consume some kafka topics based on the timestamps using startingOffsetsByTimestamps option, and the code works fine when we run via a Databricks notebook.
  *   There is a need to setup the whole process in a local system (IntelliJ), but the same code doesn’t work there. We are referring the official documentation page, and using the exact same syntax and the same versions as mentioned but somehow the code fails on the startingOffsetsByTimestamps line.
  *   The following versions are being used:

  *   Scala : 2.12.12
  *   Spark-sql : 3.0.1
  *   Spark-sql-kafka-0-10

  *   The code snippet is as follows, please suggest any changes or details that we can use to fix this :

         val spark = SparkSession.builder().appName("Automation").master("local[*]").getOrCreate()

    val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("subscribe", topic)
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", truststore_location)
      .option("kafka.ssl.truststore.password", truststore_pwd)
      .option("kafka.ssl.keystore.location", keystore_location)
      .option("kafka.ssl.keystore.password", keystore_pwd)
      .option("kafka.ssl.key.password", key_pwd)
      .option("kafka.schema.registry.url", url)
      .option("kafka.request.timeout.ms", "60000")
      .option("kafka.session.timeout.ms", "60000")
      .option("maxOffsetsPerTrigger", 3000000)
      .option("failOnDataLoss", "false")
      .option("dropMalformed", "true")
      .option("startingOffsetsByTimestamp", """{​​​​​​​"topic": {​​​​​​​"0": ​​​​​1000}​​​​​​​}​​​​​​​""")
      .load()

      df.show()

Error produced :
Exception in thread "main" java.lang.IllegalArgumentException: Expected e.g. {"topicA": {"0": 123456789, "1": 123456789},
"topicB": {"0": 123456789, "1": 123456789}}


Expecations :

  *   I believe someone from kafka/spark team would help me resolve this issue, so that I can proceed further with my work.

--
Thanks & Regards,
Yashasvini Rathore



This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

[Spark SQL] - Not able to consume Kafka topics

Posted by "Rathore, Yashasvini" <ra...@optum.com.INVALID>.
Hello,

Issues :

  *   I and my team are trying to consume some kafka topics based on the timestamps using startingOffsetsByTimestamps option, and the code works fine when we run via a Databricks notebook.
  *   There is a need to setup the whole process in a local system (IntelliJ), but the same code doesn’t work there. We are referring the official documentation page, and using the exact same syntax and the same versions as mentioned but somehow the code fails on the startingOffsetsByTimestamps line.
  *   The following versions are being used:

  *   Scala : 2.12.12
  *   Spark-sql : 3.0.1
  *   Spark-sql-kafka-0-10

  *   The code snippet is as follows, please suggest any changes or details that we can use to fix this :

         val spark = SparkSession.builder().appName("Automation").master("local[*]").getOrCreate()

    val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("subscribe", topic)
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", truststore_location)
      .option("kafka.ssl.truststore.password", truststore_pwd)
      .option("kafka.ssl.keystore.location", keystore_location)
      .option("kafka.ssl.keystore.password", keystore_pwd)
      .option("kafka.ssl.key.password", key_pwd)
      .option("kafka.schema.registry.url", url)
      .option("kafka.request.timeout.ms", "60000")
      .option("kafka.session.timeout.ms", "60000")
      .option("maxOffsetsPerTrigger", 3000000)
      .option("failOnDataLoss", "false")
      .option("dropMalformed", "true")
      .option("startingOffsetsByTimestamp", """{​​​​​​​"topic": {​​​​​​​"0": ​​​​​1000}​​​​​​​}​​​​​​​""")
      .load()

      df.show()

Error produced :
Exception in thread "main" java.lang.IllegalArgumentException: Expected e.g. {"topicA": {"0": 123456789, "1": 123456789},
"topicB": {"0": 123456789, "1": 123456789}}


Expecations :

  *   I believe someone from kafka/spark team would help me resolve this issue, so that I can proceed further with my work.

--
Thanks & Regards,
Yashasvini Rathore



This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

Re: [Spark SQL] - Not able to consume Kafka topics

Posted by Jungtaek Lim <ka...@gmail.com>.
(Dropping Kafka user mailing list as this is more likely Spark issue)

Do you have a full stack trace for a log message? It would help to make
clear where the issue lays.

On Thu, Feb 18, 2021 at 8:01 PM Rathore, Yashasvini
<ra...@optum.com.invalid> wrote:

> Hello,
>
> Issues :
>
>   *   I and my team are trying to consume some kafka topics based on the
> timestamps using startingOffsetsByTimestamps option, and the code works
> fine when we run via a Databricks notebook.
>   *   There is a need to setup the whole process in a local system
> (IntelliJ), but the same code doesn’t work there. We are referring the
> official documentation page, and using the exact same syntax and the same
> versions as mentioned but somehow the code fails on the
> startingOffsetsByTimestamps line.
>   *   The following versions are being used:
>
>   *   Scala : 2.12.12
>   *   Spark-sql : 3.0.1
>   *   Spark-sql-kafka-0-10
>
>   *   The code snippet is as follows, please suggest any changes or
> details that we can use to fix this :
>
>          val spark =
> SparkSession.builder().appName("Automation").master("local[*]").getOrCreate()
>
>     val df = spark
>       .read
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrap)
>       .option("subscribe", topic)
>       .option("kafka.security.protocol", "SSL")
>       .option("kafka.ssl.truststore.location", truststore_location)
>       .option("kafka.ssl.truststore.password", truststore_pwd)
>       .option("kafka.ssl.keystore.location", keystore_location)
>       .option("kafka.ssl.keystore.password", keystore_pwd)
>       .option("kafka.ssl.key.password", key_pwd)
>       .option("kafka.schema.registry.url", url)
>       .option("kafka.request.timeout.ms", "60000")
>       .option("kafka.session.timeout.ms", "60000")
>       .option("maxOffsetsPerTrigger", 3000000)
>       .option("failOnDataLoss", "false")
>       .option("dropMalformed", "true")
>       .option("startingOffsetsByTimestamp", """{​​​​​​​"topic":
> {​​​​​​​"0": ​​​​​1000}​​​​​​​}​​​​​​​""")
>       .load()
>
>       df.show()
>
> Error produced :
> Exception in thread "main" java.lang.IllegalArgumentException: Expected
> e.g. {"topicA": {"0": 123456789, "1": 123456789},
> "topicB": {"0": 123456789, "1": 123456789}}
>
>
> Expecations :
>
>   *   I believe someone from kafka/spark team would help me resolve this
> issue, so that I can proceed further with my work.
>
>
> --
> Thanks & Regards,
> Yashasvini Rathore
> Assoc Software Engineer II, Hyderabad, India
> (Desk) +91 403/968-5738
> Our United Culture. The way forward.
> ■ Integrity ■ Compassion ■ Relationships ■ Innovation ■ Performance
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>