You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuval Itzchakov (JIRA)" <ji...@apache.org> on 2018/08/01 10:17:00 UTC

[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers

     [ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuval Itzchakov updated SPARK-24987:
------------------------------------
    Environment: 
Spark 2.3.1

Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
 Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

Spark graph:
{code:java}
kafkaStream
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .flatMap {...}
  .groupByKey(...)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
  .foreach(...)
  .outputMode(OutputMode.Update)
  .option("checkpointLocation",
sparkConfiguration.properties.checkpointDirectory)
  .start()
  .awaitTermination()
{code}

  was:
Spark 2.3.1

Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

Spark graph:

```scala

kafkaStream
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .flatMap \{...}
  .groupByKey(...)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
  .foreach(...)
  .outputMode(OutputMode.Update)
  .option("checkpointLocation",
 sparkConfiguration.properties.checkpointDirectory)
  .start()
  .awaitTermination()

```

    Description: 
Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire.

It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence):
{code:java}
context.addTaskCompletionListener { _ => underlying.closeIfNeeded() }
{code}
I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here]([https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d]), and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing.

 

  was:
Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire.

It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence):

```scala

context.addTaskCompletionListener { _ =>
underlying.closeIfNeeded()
}

```

I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here](https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d), and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing.

 

The number of open FD increases over time and is not immediate, but you can clearly see the amount of descriptors grow over time. This is a snapshot after running the load test for about 5 hours:

 

!image-2018-08-01-13-13-16-339.png!


> Kafka Cached Consumer Leaking Consumers
> ---------------------------------------
>
>                 Key: SPARK-24987
>                 URL: https://issues.apache.org/jira/browse/SPARK-24987
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0, 2.3.1
>         Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
> Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination()
> {code}
>            Reporter: Yuval Itzchakov
>            Priority: Critical
>
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence):
> {code:java}
> context.addTaskCompletionListener { _ => underlying.closeIfNeeded() }
> {code}
> I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here]([https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d]), and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org