You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2019/02/19 08:35:00 UTC

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

    [ https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16771709#comment-16771709 ] 

Jungtaek Lim commented on SPARK-21453:
--------------------------------------

[~ppanero] [~Julescs0]
I guess it's too late, but I've missed this issue when I crafted my patch on consumer pool renewal - it leverages considered-as-stable implementation of pool (Apache Commons Pool) instead of leveraging its own implementation.
https://github.com/apache/spark/pull/22138

We haven't track down the issue so can't say my patch would mitigate the issue, but I found suspicious cases which current implementation could leak (never release) the idle consumers so I might say worth to give it a try if you're having pain and still haven't resolve the issue.

This would give a nice rationalization on my patch to get it reviewed soon and merged if it succeeds to resolve this issue.

Some information would be needed.

1) Which Spark version you're having difficulty with? This is to port back the patch to the version. (For now the patch is against master branch) You may want to note that Spark 2.2.x looks EOL (2.3.x also in this year) in Apache community so may not get support for 2.2.x. 

2) Is it easily reproducible? I mean you may be hesitant to apply not-yet-merged patch to your production, so it would be even better if we can reproduce it easily in stage env., or even locally.

Thanks in advance!

> Cached Kafka consumer may be closed too early
> ---------------------------------------------
>
>                 Key: SPARK-21453
>                 URL: https://issues.apache.org/jira/browse/SPARK-21453
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Spark 2.2.0 and kafka 0.10.2.0
>            Reporter: Pablo Panero
>            Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", config.bootstrapServers)
>       .option("failOnDataLoss", value = false)
>       .option("kafka.connections.max.idle.ms", 3600000)
>       //SSL: this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication.
>       .option("kafka.security.protocol", "SASL_SSL")
>       .option("kafka.sasl.mechanism", "GSSAPI")
>       .option("kafka.sasl.kerberos.service.name", "kafka")
>       .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>       .option("kafka.ssl.truststore.password", "changeit")
>       .option("subscribe", config.topicConfigList.keys.mkString(","))
>       .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
>         .option("checkpointLocation", s"${config.checkpointDir}/${topicConfig._1}/")
>         .format("kafka")
>         .option("kafka.bootstrap.servers", config.bootstrapServers)
>         .option("kafka.connections.max.idle.ms", 3600000)
>         //SSL: this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication.
>         .option("kafka.security.protocol", "SASL_SSL")
>         .option("kafka.sasl.mechanism", "GSSAPI")
>         .option("kafka.sasl.kerberos.service.name", "kafka")
>         .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>         .option("kafka.ssl.truststore.password", "changeit")
>         .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
> 	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> 	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> 	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> 	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> 	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
> 	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
> 	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
> 	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
> 	at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
> 	at org.apache.kafka.common.network.Selector.close(Selector.java:531)
> 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
> 	at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
> 	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
> 	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
> 	at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
> 	at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> 	at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
> 	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
> 	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
> 	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:108)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org