You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Karan Alang (Jira)" <ji...@apache.org> on 2022/10/19 00:29:00 UTC

[jira] [Commented] (SPARK-40837) Structured Streaming on Dataproc - suddenly giving error while writing to Kafka topic

    [ https://issues.apache.org/jira/browse/SPARK-40837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17619901#comment-17619901 ] 

Karan Alang commented on SPARK-40837:
-------------------------------------

Pls note - I'm able to read/write from topic using python Kafka consumer/producer, but the Structured Streaming code is failing   

> Structured Streaming on Dataproc - suddenly giving error while writing to Kafka topic
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-40837
>                 URL: https://issues.apache.org/jira/browse/SPARK-40837
>             Project: Spark
>          Issue Type: Bug
>          Components: Kubernetes, Structured Streaming
>    Affects Versions: 3.1.3
>         Environment: where i'm running Structured Streaming job - Dataproc
> Source Kafka topic - VM on GCP
> Target Kafka Topic - GKE (kubernetes)
> Kafka on GKE - Strimzi
>            Reporter: Karan Alang
>            Priority: Major
>
> i've a Structured Streaming code which reads data from a Kafka Topic (on a VM) & writes to another Kafka Topic on GKE (i should be using a Mirror Maker for this, but have not implemented that yet). it suddenly stopped working (been working fine for many months) giving following error :
> ```
>  
> {{22/10/18 19:02:35 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.kafka.common.errors.TimeoutException: Topic syslog.ueba-us4.v1.versa.demo4 not present in metadata after 60000 ms.22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry
>     at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:76)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$1(ContinuousWriteRDD.scala:53)
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>     at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
>     at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.compute(ContinuousWriteRDD.scala:84)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:131)
>     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
>     Suppressed: java.lang.NullPointerException
>         at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$7(ContinuousWriteRDD.scala:84)
>         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1495)
>         ... 11 more}}
> ```
> Code is very simple, and has been working for many months now :
>  
> ```
>  
> {{class ReadFromKafka:    def readAndWrite(self):
>         df = spark \
>             .readStream \
>             .format('kafka') \
>             .option("kafka.bootstrap.servers", kafkaBrokersSrc) \
>             .option("subscribe", srcTopic) \
>             .option("startingOffsets", "latest") \
>             .option("failOnDataLoss", "false") \
>             .load()
>         query = df.selectExpr("CAST(value AS STRING)", "cast(key AS String)") \
>             .writeStream \
>             .format("kafka") \
>             .option("checkpointLocation", checkpoint) \
>             .option("outputMode", "append") \
>             .option("truncate", "false") \
>             .option("kafka.security.protocol", security_protocol) \
>             .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>             .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>             .option("kafka.ssl.keystore.location", ssl_keystore_location) \
>             .option("kafka.ssl.keystore.password", ssl_keystore_password) \
>             .option("kafka.bootstrap.servers", kafkaBrokersTgt) \
>             .option("topic", tgtTopic) \
>             .option("kafka.ssl.keystore.type", "PKCS12") \
>             .option("kafka.ssl.truststore.type", "PKCS12") \
>             .trigger(continuous='5 seconds') \
>             .start()
>         query.awaitTermination()}}
> ```
> I'm running this on google dataproc
> ```
>  
> {{gcloud dataproc jobs submit pyspark /Users/karanalang/PycharmProjects/Kafka/versa-movedata2kafka/StructuredStreaming-readFromKafka-versa-sml-googl-v1.py  --cluster stream2kafka  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true  --files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12 --region us-east1}}
> ```
> Can you pls help check and resolve this ?
> tia!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org