You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Karan Alang (Jira)" <ji...@apache.org> on 2022/10/18 22:06:00 UTC

[jira] [Created] (SPARK-40837) Structured Streaming on Dataproc - suddenly giving error while writing to Kafka topic

Karan Alang created SPARK-40837:
-----------------------------------

             Summary: Structured Streaming on Dataproc - suddenly giving error while writing to Kafka topic
                 Key: SPARK-40837
                 URL: https://issues.apache.org/jira/browse/SPARK-40837
             Project: Spark
          Issue Type: Bug
          Components: Kubernetes, Structured Streaming
    Affects Versions: 3.1.3
         Environment: where i'm running Structured Streaming job - Dataproc

Source Kafka topic - VM on GCP

Target Kafka Topic - GKE (kubernetes)
            Reporter: Karan Alang


i've a Structured Streaming code which reads data from a Kafka Topic (on a VM) & writes to another Kafka Topic on GKE (i should be using a Mirror Maker for this, but have not implemented that yet). it suddenly stopped working (been working fine for many months) giving following error :

```
 
{{22/10/18 19:02:35 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.kafka.common.errors.TimeoutException: Topic syslog.ueba-us4.v1.versa.demo4 not present in metadata after 60000 ms.22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:76)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$1(ContinuousWriteRDD.scala:53)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.compute(ContinuousWriteRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
    Suppressed: java.lang.NullPointerException
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$7(ContinuousWriteRDD.scala:84)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1495)
        ... 11 more}}

```

Code is very simple, and has been working for many months now :

 

```
 
{{class ReadFromKafka:    def readAndWrite(self):
        df = spark \
            .readStream \
            .format('kafka') \
            .option("kafka.bootstrap.servers", kafkaBrokersSrc) \
            .option("subscribe", srcTopic) \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .load()

        query = df.selectExpr("CAST(value AS STRING)", "cast(key AS String)") \
            .writeStream \
            .format("kafka") \
            .option("checkpointLocation", checkpoint) \
            .option("outputMode", "append") \
            .option("truncate", "false") \
            .option("kafka.security.protocol", security_protocol) \
            .option("kafka.ssl.truststore.location", ssl_truststore_location) \
            .option("kafka.ssl.truststore.password", ssl_truststore_password) \
            .option("kafka.ssl.keystore.location", ssl_keystore_location) \
            .option("kafka.ssl.keystore.password", ssl_keystore_password) \
            .option("kafka.bootstrap.servers", kafkaBrokersTgt) \
            .option("topic", tgtTopic) \
            .option("kafka.ssl.keystore.type", "PKCS12") \
            .option("kafka.ssl.truststore.type", "PKCS12") \
            .trigger(continuous='5 seconds') \
            .start()
        query.awaitTermination()}}

```

I'm running this on google dataproc

```
 
{{gcloud dataproc jobs submit pyspark /Users/karanalang/PycharmProjects/Kafka/versa-movedata2kafka/StructuredStreaming-readFromKafka-versa-sml-googl-v1.py  --cluster stream2kafka  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true  --files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12 --region us-east1}}

```

Can you pls help check and resolve this ?

tia!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org