You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Julian (Jira)" <ji...@apache.org> on 2021/02/16 12:38:00 UTC

[jira] [Commented] (KAFKA-5649) Producer is being closed generating ssl exception

    [ https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17285170#comment-17285170 ] 

Julian commented on KAFKA-5649:
-------------------------------

We have now moved to HDP 3.1.5 having spark 2.3.2 and Kafka 2+ but we are forced to use kafka 0.10 libraries as no structured streaming build for these combined versions. On this new cluster, the issue is still happening in spark streaming from kafka. See https://issues.apache.org/jira/browse/SPARK-21453 which I also updated and linked back to this case. I also see https://issues.apache.org/jira/browse/KAFKA-3702 maybe related here.

Maybe kafka 2 is solving this, but unfortunately we have a long way to go until we get to spark 2.4,  kafka 2+ and the relevant structured streaming builds supporting these two.

> Producer is being closed generating ssl exception
> -------------------------------------------------
>
>                 Key: KAFKA-5649
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5649
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.10.2.1
>         Environment: Spark 2.2.0 and kafka 0.10.2.0
>            Reporter: Pablo Panero
>            Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with I am getting the following exception:
> On a streaming job using built-in kafka source and sink (over SSL), with  I am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", config.bootstrapServers)
>       .option("failOnDataLoss", value = false)
>       .option("kafka.connections.max.idle.ms", 3600000)
>       //SSL: this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication.
>       .option("kafka.security.protocol", "SASL_SSL")
>       .option("kafka.sasl.mechanism", "GSSAPI")
>       .option("kafka.sasl.kerberos.service.name", "kafka")
>       .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>       .option("kafka.ssl.truststore.password", "changeit")
>       .option("subscribe", config.topicConfigList.keys.mkString(","))
>       .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
>         .option("checkpointLocation", s"${config.checkpointDir}/${topicConfig._1}/")
>         .format("kafka")
>         .option("kafka.bootstrap.servers", config.bootstrapServers)
>         .option("kafka.connections.max.idle.ms", 3600000)
>         //SSL: this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication.
>         .option("kafka.security.protocol", "SASL_SSL")
>         .option("kafka.sasl.mechanism", "GSSAPI")
>         .option("kafka.sasl.kerberos.service.name", "kafka")
>         .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>         .option("kafka.ssl.truststore.password", "changeit")
>         .start()
> {code}
> And in some cases it throws the exception making the spark job stuck in that step. Exception stack trace is the following:
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
> 	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> 	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> 	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> 	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> 	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
> 	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
> 	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
> 	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
> 	at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
> 	at org.apache.kafka.common.network.Selector.close(Selector.java:531)
> 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
> 	at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
> 	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
> 	at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
> 	at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> 	at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
> 	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
> 	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:108)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)