You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Karan Alang (Jira)" <ji...@apache.org> on 2022/10/19 00:29:00 UTC
[jira] [Commented] (SPARK-40837) Structured Streaming on Dataproc - suddenly giving error while writing to Kafka topic
[ https://issues.apache.org/jira/browse/SPARK-40837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17619901#comment-17619901 ]
Karan Alang commented on SPARK-40837:
-------------------------------------
Pls note - I'm able to read/write from topic using python Kafka consumer/producer, but the Structured Streaming code is failing
> Structured Streaming on Dataproc - suddenly giving error while writing to Kafka topic
> -------------------------------------------------------------------------------------
>
> Key: SPARK-40837
> URL: https://issues.apache.org/jira/browse/SPARK-40837
> Project: Spark
> Issue Type: Bug
> Components: Kubernetes, Structured Streaming
> Affects Versions: 3.1.3
> Environment: where i'm running Structured Streaming job - Dataproc
> Source Kafka topic - VM on GCP
> Target Kafka Topic - GKE (kubernetes)
> Kafka on GKE - Strimzi
> Reporter: Karan Alang
> Priority: Major
>
> i've a Structured Streaming code which reads data from a Kafka Topic (on a VM) & writes to another Kafka Topic on GKE (i should be using a Mirror Maker for this, but have not implemented that yet). it suddenly stopped working (been working fine for many months) giving following error :
> ```
>
> {{22/10/18 19:02:35 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.kafka.common.errors.TimeoutException: Topic syslog.ueba-us4.v1.versa.demo4 not present in metadata after 60000 ms.22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry
> at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:76)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$1(ContinuousWriteRDD.scala:53)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
> at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.compute(ContinuousWriteRDD.scala:84)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Suppressed: java.lang.NullPointerException
> at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$7(ContinuousWriteRDD.scala:84)
> at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1495)
> ... 11 more}}
> ```
> Code is very simple, and has been working for many months now :
>
> ```
>
> {{class ReadFromKafka: def readAndWrite(self):
> df = spark \
> .readStream \
> .format('kafka') \
> .option("kafka.bootstrap.servers", kafkaBrokersSrc) \
> .option("subscribe", srcTopic) \
> .option("startingOffsets", "latest") \
> .option("failOnDataLoss", "false") \
> .load()
> query = df.selectExpr("CAST(value AS STRING)", "cast(key AS String)") \
> .writeStream \
> .format("kafka") \
> .option("checkpointLocation", checkpoint) \
> .option("outputMode", "append") \
> .option("truncate", "false") \
> .option("kafka.security.protocol", security_protocol) \
> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
> .option("kafka.ssl.keystore.location", ssl_keystore_location) \
> .option("kafka.ssl.keystore.password", ssl_keystore_password) \
> .option("kafka.bootstrap.servers", kafkaBrokersTgt) \
> .option("topic", tgtTopic) \
> .option("kafka.ssl.keystore.type", "PKCS12") \
> .option("kafka.ssl.truststore.type", "PKCS12") \
> .trigger(continuous='5 seconds') \
> .start()
> query.awaitTermination()}}
> ```
> I'm running this on google dataproc
> ```
>
> {{gcloud dataproc jobs submit pyspark /Users/karanalang/PycharmProjects/Kafka/versa-movedata2kafka/StructuredStreaming-readFromKafka-versa-sml-googl-v1.py --cluster stream2kafka --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true --files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12 --region us-east1}}
> ```
> Can you pls help check and resolve this ?
> tia!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org