You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Shixiong(Ryan) Zhu" <sh...@databricks.com> on 2017/02/01 01:28:33 UTC

Re: Resource Leak in Spark Streaming

The KafkaProducerPool instance is created in the driver. Right? What's I
was saying is when a Spark job runs, it will serialize KafkaProducerPool
and create a new instance in the executor side.

You can use the singleton pattern to make sure one JVM process has only one
KafkaProducerPool instance.

On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora <ni...@gmail.com>
wrote:

> It's a producer pool, the borrow object takes an existing kafka producer
> object if it is free, or creates one if all are being used.
> Shouldn't we re-use kafka producer objects for writing to Kafka.
>
> @ryan- can you suggest a good solution for writing a dstream to kafka
> which can be used in production?
>
> I am attaching the Kafka producer pool class, where would one issue a call
> to close():
>
> public class KafkaProducerPool implements Serializable {
>
>    private static final long serialVersionUID = -1913028296093224674L;
>
>    private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> pool;
>
>    private ScheduledExecutorService executorService;
>
>    private final Properties properties;
>
>    private final int minIdle;
>
>    /**
>     * Creates the pool.
>     *
>     * @param minIdle
>     *            minimum number of objects residing in the pool
>     */
>    public KafkaProducerPool(final int minIdle, final Properties properties) {
>       // initialize pool
>       this.properties = properties;
>       this.minIdle = minIdle;
>       initialize();
>
>    }
>
>    /**
>     * Creates the pool.
>     *
>     * @param minIdle
>     *            minimum number of objects residing in the pool
>     * @param maxIdle
>     *            maximum number of objects residing in the pool
>     * @param validationInterval
>     *            time in seconds for periodical checking of minIdle / maxIdle
>     *            conditions in a separate thread. When the number of objects is
>     *            less than minIdle, missing instances will be created. When the
>     *            number of objects is greater than maxIdle, too many instances
>     *            will be removed.
>     */
>    public KafkaProducerPool(final int minIdle, final int maxIdle,
>          final long validationInterval, final Properties properties) {
>       // initialize pool
>       this.properties = properties;
>       this.minIdle = minIdle;
>       initialize();
>
>       // check pool conditions in a separate thread
>       executorService = Executors.newSingleThreadScheduledExecutor();
>       executorService.scheduleWithFixedDelay(new Runnable() {
>          @Override
>          public void run() {
>             int size = pool.size();
>             if (size < minIdle) {
>                int sizeToBeAdded = minIdle - size;
>                for (int i = 0; i < sizeToBeAdded; i++) {
>                   pool.add(createProducer());
>                }
>             } else if (size > maxIdle) {
>                int sizeToBeRemoved = size - maxIdle;
>                for (int i = 0; i < sizeToBeRemoved; i++) {
>                   pool.poll();
>                }
>             }
>          }
>       }, validationInterval, validationInterval, TimeUnit.SECONDS);
>    }
>
>    /**
>     * Gets the next free object from the pool. If the pool doesn't contain any
>     * objects, a new object will be created and given to the caller of this
>     * method back.
>     *
>     * @return T borrowed object
>     */
>    public synchronized KafkaProducer<String, String> borrowProducer() {
>       if (pool == null)
>          initialize();
>       KafkaProducer<String, String> object;
>       if ((object = pool.poll()) == null) {
>          object = createProducer();
>       }
>
>       return object;
>    }
>
>    /**
>     * Returns object back to the pool.
>     *
>     *            object to be returned
>     */
>    public void returnProducer(KafkaProducer<String, String> producer) {
>       if (producer == null) {
>          return;
>       }
>       this.pool.offer(producer);
>    }
>
>    /**
>     * Shutdown this pool.
>     */
>    public void shutdown() {
>       if (executorService != null) {
>          KafkaProducer<String, String> producer;
>          while ((producer = pool.poll()) != null) {
>             producer.close();
>          }
>          executorService.shutdown();
>       }
>    }
>
>    /**
>     * Creates a new producer.
>     *
>     * @return T new object
>     */
>    private KafkaProducer<String, String> createProducer() {
>       KafkaProducer<String, String> producer = new KafkaProducer<String,String>(properties);
>       return producer;
>    }
>
>    private void initialize() {
>       pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>();
>
>       for (int i = 0; i < minIdle; i++) {
>          pool.add(createProducer());
>       }
>    }
>
>    public void closeAll() {
>       KafkaProducer<String, String> object;
>       while ((object = pool.poll()) != null) {
>          //object.flush();
>          object.close();
>       }
>    }
> }
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Looks like you create KafkaProducerPool in the driver. So when the task
>> is running in the executor, it will always see an new
>> empty KafkaProducerPool and create KafkaProducers. But nobody closes these
>> KafkaProducers.
>>
>> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <ni...@gmail.com>
>> wrote:
>>
>>
>> Sorry for not writing the patch number, it's spark 1.6.1.
>> The relevant code is here inline.
>>
>> Please have a look and let me know if there is a resource leak.
>> Please also let me know if you need any more details.
>>
>> Thanks
>> Nipun
>>
>>
>> The JavaRDDKafkaWriter code is here inline:
>>
>> import org.apache.spark.api.java.JavaRDD;
>> import org.apache.spark.api.java.function.VoidFunction;
>> import scala.Tuple2;
>>
>> import java.io.Serializable;
>> import java.util.Iterator;
>>
>> public class JavaRDDStringKafkaWriter implements Serializable, VoidFunction<JavaRDD<String>> {
>>
>>    private static final long serialVersionUID = -865193912367180261L;
>>    private final KafkaProducerPool pool;
>>    private final String topic;
>>    private final Boolean kafkaAsync;
>>
>>    public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String topic, Boolean kafkaAsync) {
>>       this.pool = pool;
>>       this.topic = topic;
>>       this.kafkaAsync = kafkaAsync;
>>    }
>>
>>    @Override
>>    public void call(JavaRDD<String> stringJavaRDD) throws Exception {
>>       stringJavaRDD.foreachPartition(new PartitionVoidFunction(
>>             new RDDKafkaWriter(pool,kafkaAsync), topic));
>>    }
>>
>>    private class PartitionVoidFunction implements
>>          VoidFunction<Iterator<String>> {
>>
>>       private static final long serialVersionUID = 8726871215617446598L;
>>       private final RDDKafkaWriter kafkaWriter;
>>       private final String topic;
>>
>>       public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
>>          this.kafkaWriter = kafkaWriter;
>>          this.topic = topic;
>>       }
>>
>>       @Override
>>       public void call(Iterator<String> iterator) throws Exception {
>>          while (iterator.hasNext()) {
>>             kafkaWriter.writeToKafka(topic, iterator.next());
>>          }
>>       }
>>    }
>> }
>>
>>
>> The RDDKafkaWriter is here:
>>
>>
>> import java.io.Serializable;
>> import java.util.concurrent.ExecutionException;
>>
>> import org.apache.kafka.clients.producer.KafkaProducer;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import scala.Tuple2;
>>
>> public class RDDKafkaWriter implements Serializable {
>>
>>    private static final long serialVersionUID = 7374381310562055607L;
>>    private final KafkaProducerPool pool;
>>    private final Boolean async;
>>
>>    public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>>       this.pool = pool;
>>       this.async = async;
>>
>>    }
>>
>>    public void writeToKafka(String topic, Tuple2<String, String> message) {
>>       KafkaProducer<String, String> producer = pool.borrowProducer();
>>       ProducerRecord<String, String> record = new ProducerRecord<String, String>(
>>             topic, message._1(), message._2());
>>       if (async) {
>>          producer.send(record);
>>       } else {
>>          try {
>>             producer.send(record).get();
>>          } catch (Exception e) {
>>             e.printStackTrace();
>>          }
>>       }
>>       pool.returnProducer(producer);
>>    }
>>
>>     public void writeToKafka(String topic, String message) {
>>
>>         KafkaProducer<String, String> producer = pool.borrowProducer();
>>         ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
>>
>>         if (async) {
>>             producer.send(record);
>>         } else {
>>             try {
>>                 producer.send(record).get();
>>             } catch (Exception e) {
>>                 e.printStackTrace();
>>             }
>>         }
>>         pool.returnProducer(producer);
>>     }
>>
>>
>> }
>>
>>
>>
>>
>>
>> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
>> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
>> resources.
>>
>> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <ni...@gmail.com>
>> wrote:
>>
>> It is spark 1.6
>>
>> Thanks
>> Nipun
>>
>> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Could you provide your Spark version please?
>>
>> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <ni...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I get a resource leak, where the number of file descriptors in spark
>> streaming keeps increasing. We end up with a "too many file open" error
>> eventually through an exception caused in:
>>
>> JAVARDDKafkaWriter, which is writing a spark JavaDStream<String>
>>
>> The exception is attached inline. Any help will be greatly appreciated.
>>
>> Thanks
>> Nipun
>>
>> -------------------------------------------
>> Time: 1485762530000 ms
>> -------------------------------------------
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
>> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$failJobAndIndependentStages(
>> DAGScheduler.scala:1431)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1419)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1418)
>> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(
>> DAGScheduler.scala:1418)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at scala.Option.foreach(Option.scala:236)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> doOnReceive(DAGScheduler.scala:1640)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1599)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:920)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:918)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:150)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:111)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
>> at org.apache.spark.api.java.JavaRDDLike$class.
>> foreachPartition(JavaRDDLike.scala:225)
>> at org.apache.spark.api.java.AbstractJavaRDDLike.
>> foreachPartition(JavaRDDLike.scala:46)
>> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
>> JavaRDDStringKafkaWriter.java:25)
>> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
>> JavaRDDStringKafkaWriter.java:10)
>> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
>> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
>> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>> at org.apache.spark.streaming.dstream.DStream$$anonfun$
>> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>> at org.apache.spark.streaming.dstream.DStream$$anonfun$
>> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.DStream.
>> createRDDWithLocalProperties(DStream.scala:426)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
>> ForEachDStream.scala:49)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
>> ForEachDStream.scala:49)
>> at scala.util.Try$.apply(Try.scala:161)
>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler.run(JobScheduler.scala:228)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> ... 3 more
>>
>>
>>
>>
>>

Re: Resource Leak in Spark Streaming

Posted by Nipun Arora <ni...@gmail.com>.
I located the issue:

Having the following seems to be necessary in the pool object to make it
serialized:

*private transient *ConcurrentLinkedQueue<KafkaProducer<String, String>>
*pool*;

However this means open connections cannot be re-used in subsequent
micro-batches, as transient objects are not persistent. How can we go
around this problem?


Thanks,

Nipun

On Tue, Feb 7, 2017 at 6:35 PM, Nipun Arora <ni...@gmail.com>
wrote:

> Ryan,
>
> Apologies for coming back so late, I created a github repo to resolve
> this problem. On trying your solution for making the pool a Singleton,
> I get a null pointer exception in the worker.
> Do you have any other suggestions, or a simpler mechanism for handling
> this?
>
> I have put all the current code which was forked from an existing git repo
> here:
> https://github.com/nipunarora/Spark-Kafka-Writer
>
> There does seem to be duplicate creation of Kafka Writers in every
> micro-batch.
>
> Thanks
> Nipun
>
> P.S the version I shared before was writing JavaDStream<String>, the
> one in the github project writes JavaDStream<Tuple2<>>
>

Re: Resource Leak in Spark Streaming

Posted by Nipun Arora <ni...@gmail.com>.
Ryan,

Apologies for coming back so late, I created a github repo to resolve
this problem. On trying your solution for making the pool a Singleton,
I get a null pointer exception in the worker.
Do you have any other suggestions, or a simpler mechanism for handling this?

I have put all the current code which was forked from an existing git repo here:
https://github.com/nipunarora/Spark-Kafka-Writer

There does seem to be duplicate creation of Kafka Writers in every micro-batch.

Thanks
Nipun

P.S the version I shared before was writing JavaDStream<String>, the
one in the github project writes JavaDStream<Tuple2<>>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Resource Leak in Spark Streaming

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Let me try to explain it.

1. KafkaProducerPool A is created in the driver.
2. KafkaProducerPool A is **serialized** and sent to the executor.
3. KafkaProducerPool B is **deserialized** from the binary in the executor
(a new instance), which contains no objects right now.
4. KafkaProducerPool B's borrowProducer and returnProducer is called in the
executor. Hence, KafkaProducerPool B contains one KafkaProducer. (This is
the resource leak because this KafkaProducer is never closed)
5. The Spark job finishes.
6. Now we come back to the driver. It creates a new batch and go to step 2.

What you might miss is deserializing PartitionVoidFunction in the executor
will create a new KafkaProducerPool.


On Tue, Jan 31, 2017 at 7:24 PM, Nipun Arora <ni...@gmail.com>
wrote:

> Just to be clear the pool object creation happens in the driver code, and
> not in any anonymous function which should be executed in the executor.
>
> On Tue, Jan 31, 2017 at 10:21 PM Nipun Arora <ni...@gmail.com>
> wrote:
>
>> Thanks for the suggestion Ryan, I will convert it to singleton and see if
>> it solves the problem.
>> If a code/object is created in the driver (in this case a connection
>> object is passed from a pool of objects created in the driver) and is
>> passed to executors or workers, why would a new object be created in each
>> executor?
>> Also would this new object be created every micro-batch?
>>
>> I'm sorry I might not understand what is going on properly so wanted to
>> ask.
>>
>> Thanks
>> Nipun
>>
>>
>>
>> On Tue, Jan 31, 2017 at 8:28 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> The KafkaProducerPool instance is created in the driver. Right? What's I
>> was saying is when a Spark job runs, it will serialize KafkaProducerPool
>> and create a new instance in the executor side.
>>
>> You can use the singleton pattern to make sure one JVM process has only
>> one KafkaProducerPool instance.
>>
>> On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora <ni...@gmail.com>
>> wrote:
>>
>> It's a producer pool, the borrow object takes an existing kafka producer
>> object if it is free, or creates one if all are being used.
>> Shouldn't we re-use kafka producer objects for writing to Kafka.
>>
>> @ryan- can you suggest a good solution for writing a dstream to kafka
>> which can be used in production?
>>
>> I am attaching the Kafka producer pool class, where would one issue a
>> call to close():
>>
>> public class KafkaProducerPool implements Serializable {
>>
>>    private static final long serialVersionUID = -1913028296093224674L;
>>
>>    private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> pool;
>>
>>    private ScheduledExecutorService executorService;
>>
>>    private final Properties properties;
>>
>>    private final int minIdle;
>>
>>    /**
>>     * Creates the pool.
>>     *
>>     * @param minIdle
>>     *            minimum number of objects residing in the pool
>>     */
>>    public KafkaProducerPool(final int minIdle, final Properties properties) {
>>       // initialize pool
>>       this.properties = properties;
>>       this.minIdle = minIdle;
>>       initialize();
>>
>>    }
>>
>>    /**
>>     * Creates the pool.
>>     *
>>     * @param minIdle
>>     *            minimum number of objects residing in the pool
>>     * @param maxIdle
>>     *            maximum number of objects residing in the pool
>>     * @param validationInterval
>>     *            time in seconds for periodical checking of minIdle / maxIdle
>>     *            conditions in a separate thread. When the number of objects is
>>     *            less than minIdle, missing instances will be created. When the
>>     *            number of objects is greater than maxIdle, too many instances
>>     *            will be removed.
>>     */
>>    public KafkaProducerPool(final int minIdle, final int maxIdle,
>>          final long validationInterval, final Properties properties) {
>>       // initialize pool
>>       this.properties = properties;
>>       this.minIdle = minIdle;
>>       initialize();
>>
>>       // check pool conditions in a separate thread
>>       executorService = Executors.newSingleThreadScheduledExecutor();
>>       executorService.scheduleWithFixedDelay(new Runnable() {
>>          @Override
>>          public void run() {
>>             int size = pool.size();
>>             if (size < minIdle) {
>>                int sizeToBeAdded = minIdle - size;
>>                for (int i = 0; i < sizeToBeAdded; i++) {
>>                   pool.add(createProducer());
>>                }
>>             } else if (size > maxIdle) {
>>                int sizeToBeRemoved = size - maxIdle;
>>                for (int i = 0; i < sizeToBeRemoved; i++) {
>>                   pool.poll();
>>                }
>>             }
>>          }
>>       }, validationInterval, validationInterval, TimeUnit.SECONDS);
>>    }
>>
>>    /**
>>     * Gets the next free object from the pool. If the pool doesn't contain any
>>     * objects, a new object will be created and given to the caller of this
>>     * method back.
>>     *
>>     * @return T borrowed object
>>     */
>>    public synchronized KafkaProducer<String, String> borrowProducer() {
>>       if (pool == null)
>>          initialize();
>>       KafkaProducer<String, String> object;
>>       if ((object = pool.poll()) == null) {
>>          object = createProducer();
>>       }
>>
>>       return object;
>>    }
>>
>>    /**
>>     * Returns object back to the pool.
>>     *
>>     *            object to be returned
>>     */
>>    public void returnProducer(KafkaProducer<String, String> producer) {
>>       if (producer == null) {
>>          return;
>>       }
>>       this.pool.offer(producer);
>>    }
>>
>>    /**
>>     * Shutdown this pool.
>>     */
>>    public void shutdown() {
>>       if (executorService != null) {
>>          KafkaProducer<String, String> producer;
>>          while ((producer = pool.poll()) != null) {
>>             producer.close();
>>          }
>>          executorService.shutdown();
>>       }
>>    }
>>
>>    /**
>>     * Creates a new producer.
>>     *
>>     * @return T new object
>>     */
>>    private KafkaProducer<String, String> createProducer() {
>>       KafkaProducer<String, String> producer = new KafkaProducer<String,String>(properties);
>>       return producer;
>>    }
>>
>>    private void initialize() {
>>       pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>();
>>
>>       for (int i = 0; i < minIdle; i++) {
>>          pool.add(createProducer());
>>       }
>>    }
>>
>>    public void closeAll() {
>>       KafkaProducer<String, String> object;
>>       while ((object = pool.poll()) != null) {
>>          //object.flush();
>>          object.close();
>>       }
>>    }
>> }
>>
>> Thanks
>> Nipun
>>
>> On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Looks like you create KafkaProducerPool in the driver. So when the task
>> is running in the executor, it will always see an new
>> empty KafkaProducerPool and create KafkaProducers. But nobody closes these
>> KafkaProducers.
>>
>> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <ni...@gmail.com>
>> wrote:
>>
>>
>> Sorry for not writing the patch number, it's spark 1.6.1.
>> The relevant code is here inline.
>>
>> Please have a look and let me know if there is a resource leak.
>> Please also let me know if you need any more details.
>>
>> Thanks
>> Nipun
>>
>>
>> The JavaRDDKafkaWriter code is here inline:
>>
>> import org.apache.spark.api.java.JavaRDD;
>> import org.apache.spark.api.java.function.VoidFunction;
>> import scala.Tuple2;
>>
>> import java.io.Serializable;
>> import java.util.Iterator;
>>
>> public class JavaRDDStringKafkaWriter implements Serializable, VoidFunction<JavaRDD<String>> {
>>
>>    private static final long serialVersionUID = -865193912367180261L;
>>    private final KafkaProducerPool pool;
>>    private final String topic;
>>    private final Boolean kafkaAsync;
>>
>>    public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String topic, Boolean kafkaAsync) {
>>       this.pool = pool;
>>       this.topic = topic;
>>       this.kafkaAsync = kafkaAsync;
>>    }
>>
>>    @Override
>>    public void call(JavaRDD<String> stringJavaRDD) throws Exception {
>>       stringJavaRDD.foreachPartition(new PartitionVoidFunction(
>>             new RDDKafkaWriter(pool,kafkaAsync), topic));
>>    }
>>
>>    private class PartitionVoidFunction implements
>>          VoidFunction<Iterator<String>> {
>>
>>       private static final long serialVersionUID = 8726871215617446598L;
>>       private final RDDKafkaWriter kafkaWriter;
>>       private final String topic;
>>
>>       public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
>>          this.kafkaWriter = kafkaWriter;
>>          this.topic = topic;
>>       }
>>
>>       @Override
>>       public void call(Iterator<String> iterator) throws Exception {
>>          while (iterator.hasNext()) {
>>             kafkaWriter.writeToKafka(topic, iterator.next());
>>          }
>>       }
>>    }
>> }
>>
>>
>> The RDDKafkaWriter is here:
>>
>>
>> import java.io.Serializable;
>> import java.util.concurrent.ExecutionException;
>>
>> import org.apache.kafka.clients.producer.KafkaProducer;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import scala.Tuple2;
>>
>> public class RDDKafkaWriter implements Serializable {
>>
>>    private static final long serialVersionUID = 7374381310562055607L;
>>    private final KafkaProducerPool pool;
>>    private final Boolean async;
>>
>>    public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>>       this.pool = pool;
>>       this.async = async;
>>
>>    }
>>
>>    public void writeToKafka(String topic, Tuple2<String, String> message) {
>>       KafkaProducer<String, String> producer = pool.borrowProducer();
>>       ProducerRecord<String, String> record = new ProducerRecord<String, String>(
>>             topic, message._1(), message._2());
>>       if (async) {
>>          producer.send(record);
>>       } else {
>>          try {
>>             producer.send(record).get();
>>          } catch (Exception e) {
>>             e.printStackTrace();
>>          }
>>       }
>>       pool.returnProducer(producer);
>>    }
>>
>>     public void writeToKafka(String topic, String message) {
>>
>>         KafkaProducer<String, String> producer = pool.borrowProducer();
>>         ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
>>
>>         if (async) {
>>             producer.send(record);
>>         } else {
>>             try {
>>                 producer.send(record).get();
>>             } catch (Exception e) {
>>                 e.printStackTrace();
>>             }
>>         }
>>         pool.returnProducer(producer);
>>     }
>>
>>
>> }
>>
>>
>>
>>
>>
>> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
>> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
>> resources.
>>
>> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <ni...@gmail.com>
>> wrote:
>>
>> It is spark 1.6
>>
>> Thanks
>> Nipun
>>
>> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Could you provide your Spark version please?
>>
>> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <ni...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I get a resource leak, where the number of file descriptors in spark
>> streaming keeps increasing. We end up with a "too many file open" error
>> eventually through an exception caused in:
>>
>> JAVARDDKafkaWriter, which is writing a spark JavaDStream<String>
>>
>> The exception is attached inline. Any help will be greatly appreciated.
>>
>> Thanks
>> Nipun
>>
>> -------------------------------------------
>> Time: 1485762530000 ms
>> -------------------------------------------
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
>> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$failJobAndIndependentStages(
>> DAGScheduler.scala:1431)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1419)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1418)
>> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(
>> DAGScheduler.scala:1418)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at scala.Option.foreach(Option.scala:236)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> doOnReceive(DAGScheduler.scala:1640)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1599)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:920)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:918)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:150)
>>
>>

Re: Resource Leak in Spark Streaming

Posted by Nipun Arora <ni...@gmail.com>.
Just to be clear the pool object creation happens in the driver code, and
not in any anonymous function which should be executed in the executor.

On Tue, Jan 31, 2017 at 10:21 PM Nipun Arora <ni...@gmail.com>
wrote:

> Thanks for the suggestion Ryan, I will convert it to singleton and see if
> it solves the problem.
> If a code/object is created in the driver (in this case a connection
> object is passed from a pool of objects created in the driver) and is
> passed to executors or workers, why would a new object be created in each
> executor?
> Also would this new object be created every micro-batch?
>
> I'm sorry I might not understand what is going on properly so wanted to
> ask.
>
> Thanks
> Nipun
>
>
>
> On Tue, Jan 31, 2017 at 8:28 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> The KafkaProducerPool instance is created in the driver. Right? What's I
> was saying is when a Spark job runs, it will serialize KafkaProducerPool
> and create a new instance in the executor side.
>
> You can use the singleton pattern to make sure one JVM process has only
> one KafkaProducerPool instance.
>
> On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora <ni...@gmail.com>
> wrote:
>
> It's a producer pool, the borrow object takes an existing kafka producer
> object if it is free, or creates one if all are being used.
> Shouldn't we re-use kafka producer objects for writing to Kafka.
>
> @ryan- can you suggest a good solution for writing a dstream to kafka
> which can be used in production?
>
> I am attaching the Kafka producer pool class, where would one issue a call
> to close():
>
> public class KafkaProducerPool implements Serializable {
>
>    private static final long serialVersionUID = -1913028296093224674L;
>
>    private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> pool;
>
>    private ScheduledExecutorService executorService;
>
>    private final Properties properties;
>
>    private final int minIdle;
>
>    /**
>     * Creates the pool.
>     *
>     * @param minIdle
>     *            minimum number of objects residing in the pool
>     */
>    public KafkaProducerPool(final int minIdle, final Properties properties) {
>       // initialize pool
>       this.properties = properties;
>       this.minIdle = minIdle;
>       initialize();
>
>    }
>
>    /**
>     * Creates the pool.
>     *
>     * @param minIdle
>     *            minimum number of objects residing in the pool
>     * @param maxIdle
>     *            maximum number of objects residing in the pool
>     * @param validationInterval
>     *            time in seconds for periodical checking of minIdle / maxIdle
>     *            conditions in a separate thread. When the number of objects is
>     *            less than minIdle, missing instances will be created. When the
>     *            number of objects is greater than maxIdle, too many instances
>     *            will be removed.
>     */
>    public KafkaProducerPool(final int minIdle, final int maxIdle,
>          final long validationInterval, final Properties properties) {
>       // initialize pool
>       this.properties = properties;
>       this.minIdle = minIdle;
>       initialize();
>
>       // check pool conditions in a separate thread
>       executorService = Executors.newSingleThreadScheduledExecutor();
>       executorService.scheduleWithFixedDelay(new Runnable() {
>          @Override
>          public void run() {
>             int size = pool.size();
>             if (size < minIdle) {
>                int sizeToBeAdded = minIdle - size;
>                for (int i = 0; i < sizeToBeAdded; i++) {
>                   pool.add(createProducer());
>                }
>             } else if (size > maxIdle) {
>                int sizeToBeRemoved = size - maxIdle;
>                for (int i = 0; i < sizeToBeRemoved; i++) {
>                   pool.poll();
>                }
>             }
>          }
>       }, validationInterval, validationInterval, TimeUnit.SECONDS);
>    }
>
>    /**
>     * Gets the next free object from the pool. If the pool doesn't contain any
>     * objects, a new object will be created and given to the caller of this
>     * method back.
>     *
>     * @return T borrowed object
>     */
>    public synchronized KafkaProducer<String, String> borrowProducer() {
>       if (pool == null)
>          initialize();
>       KafkaProducer<String, String> object;
>       if ((object = pool.poll()) == null) {
>          object = createProducer();
>       }
>
>       return object;
>    }
>
>    /**
>     * Returns object back to the pool.
>     *
>     *            object to be returned
>     */
>    public void returnProducer(KafkaProducer<String, String> producer) {
>       if (producer == null) {
>          return;
>       }
>       this.pool.offer(producer);
>    }
>
>    /**
>     * Shutdown this pool.
>     */
>    public void shutdown() {
>       if (executorService != null) {
>          KafkaProducer<String, String> producer;
>          while ((producer = pool.poll()) != null) {
>             producer.close();
>          }
>          executorService.shutdown();
>       }
>    }
>
>    /**
>     * Creates a new producer.
>     *
>     * @return T new object
>     */
>    private KafkaProducer<String, String> createProducer() {
>       KafkaProducer<String, String> producer = new KafkaProducer<String,String>(properties);
>       return producer;
>    }
>
>    private void initialize() {
>       pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>();
>
>       for (int i = 0; i < minIdle; i++) {
>          pool.add(createProducer());
>       }
>    }
>
>    public void closeAll() {
>       KafkaProducer<String, String> object;
>       while ((object = pool.poll()) != null) {
>          //object.flush();
>          object.close();
>       }
>    }
> }
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Looks like you create KafkaProducerPool in the driver. So when the task is
> running in the executor, it will always see an new empty KafkaProducerPool
> and create KafkaProducers. But nobody closes these KafkaProducers.
>
> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <ni...@gmail.com>
> wrote:
>
>
> Sorry for not writing the patch number, it's spark 1.6.1.
> The relevant code is here inline.
>
> Please have a look and let me know if there is a resource leak.
> Please also let me know if you need any more details.
>
> Thanks
> Nipun
>
>
> The JavaRDDKafkaWriter code is here inline:
>
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.VoidFunction;
> import scala.Tuple2;
>
> import java.io.Serializable;
> import java.util.Iterator;
>
> public class JavaRDDStringKafkaWriter implements Serializable, VoidFunction<JavaRDD<String>> {
>
>    private static final long serialVersionUID = -865193912367180261L;
>    private final KafkaProducerPool pool;
>    private final String topic;
>    private final Boolean kafkaAsync;
>
>    public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String topic, Boolean kafkaAsync) {
>       this.pool = pool;
>       this.topic = topic;
>       this.kafkaAsync = kafkaAsync;
>    }
>
>    @Override
>    public void call(JavaRDD<String> stringJavaRDD) throws Exception {
>       stringJavaRDD.foreachPartition(new PartitionVoidFunction(
>             new RDDKafkaWriter(pool,kafkaAsync), topic));
>    }
>
>    private class PartitionVoidFunction implements
>          VoidFunction<Iterator<String>> {
>
>       private static final long serialVersionUID = 8726871215617446598L;
>       private final RDDKafkaWriter kafkaWriter;
>       private final String topic;
>
>       public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
>          this.kafkaWriter = kafkaWriter;
>          this.topic = topic;
>       }
>
>       @Override
>       public void call(Iterator<String> iterator) throws Exception {
>          while (iterator.hasNext()) {
>             kafkaWriter.writeToKafka(topic, iterator.next());
>          }
>       }
>    }
> }
>
>
> The RDDKafkaWriter is here:
>
>
> import java.io.Serializable;
> import java.util.concurrent.ExecutionException;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import scala.Tuple2;
>
> public class RDDKafkaWriter implements Serializable {
>
>    private static final long serialVersionUID = 7374381310562055607L;
>    private final KafkaProducerPool pool;
>    private final Boolean async;
>
>    public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>       this.pool = pool;
>       this.async = async;
>
>    }
>
>    public void writeToKafka(String topic, Tuple2<String, String> message) {
>       KafkaProducer<String, String> producer = pool.borrowProducer();
>       ProducerRecord<String, String> record = new ProducerRecord<String, String>(
>             topic, message._1(), message._2());
>       if (async) {
>          producer.send(record);
>       } else {
>          try {
>             producer.send(record).get();
>          } catch (Exception e) {
>             e.printStackTrace();
>          }
>       }
>       pool.returnProducer(producer);
>    }
>
>     public void writeToKafka(String topic, String message) {
>
>         KafkaProducer<String, String> producer = pool.borrowProducer();
>         ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
>
>         if (async) {
>             producer.send(record);
>         } else {
>             try {
>                 producer.send(record).get();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>         }
>         pool.returnProducer(producer);
>     }
>
>
> }
>
>
>
>
>
> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
> resources.
>
> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <ni...@gmail.com>
> wrote:
>
> It is spark 1.6
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Could you provide your Spark version please?
>
> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <ni...@gmail.com>
> wrote:
>
> Hi,
>
> I get a resource leak, where the number of file descriptors in spark
> streaming keeps increasing. We end up with a "too many file open" error
> eventually through an exception caused in:
>
> JAVARDDKafkaWriter, which is writing a spark JavaDStream<String>
>
> The exception is attached inline. Any help will be greatly appreciated.
>
> Thanks
> Nipun
>
> -------------------------------------------
> Time: 1485762530000 ms
> -------------------------------------------
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
> java.io.FileNotFoundException:
> /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181)
> at
> org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
> at
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>
>

Re: Resource Leak in Spark Streaming

Posted by Nipun Arora <ni...@gmail.com>.
Thanks for the suggestion Ryan, I will convert it to singleton and see if
it solves the problem.
If a code/object is created in the driver (in this case a connection object
is passed from a pool of objects created in the driver) and is passed to
executors or workers, why would a new object be created in each executor?
Also would this new object be created every micro-batch?

I'm sorry I might not understand what is going on properly so wanted to ask.

Thanks
Nipun



On Tue, Jan 31, 2017 at 8:28 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> The KafkaProducerPool instance is created in the driver. Right? What's I
> was saying is when a Spark job runs, it will serialize KafkaProducerPool
> and create a new instance in the executor side.
>
> You can use the singleton pattern to make sure one JVM process has only
> one KafkaProducerPool instance.
>
> On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora <ni...@gmail.com>
> wrote:
>
> It's a producer pool, the borrow object takes an existing kafka producer
> object if it is free, or creates one if all are being used.
> Shouldn't we re-use kafka producer objects for writing to Kafka.
>
> @ryan- can you suggest a good solution for writing a dstream to kafka
> which can be used in production?
>
> I am attaching the Kafka producer pool class, where would one issue a call
> to close():
>
> public class KafkaProducerPool implements Serializable {
>
>    private static final long serialVersionUID = -1913028296093224674L;
>
>    private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> pool;
>
>    private ScheduledExecutorService executorService;
>
>    private final Properties properties;
>
>    private final int minIdle;
>
>    /**
>     * Creates the pool.
>     *
>     * @param minIdle
>     *            minimum number of objects residing in the pool
>     */
>    public KafkaProducerPool(final int minIdle, final Properties properties) {
>       // initialize pool
>       this.properties = properties;
>       this.minIdle = minIdle;
>       initialize();
>
>    }
>
>    /**
>     * Creates the pool.
>     *
>     * @param minIdle
>     *            minimum number of objects residing in the pool
>     * @param maxIdle
>     *            maximum number of objects residing in the pool
>     * @param validationInterval
>     *            time in seconds for periodical checking of minIdle / maxIdle
>     *            conditions in a separate thread. When the number of objects is
>     *            less than minIdle, missing instances will be created. When the
>     *            number of objects is greater than maxIdle, too many instances
>     *            will be removed.
>     */
>    public KafkaProducerPool(final int minIdle, final int maxIdle,
>          final long validationInterval, final Properties properties) {
>       // initialize pool
>       this.properties = properties;
>       this.minIdle = minIdle;
>       initialize();
>
>       // check pool conditions in a separate thread
>       executorService = Executors.newSingleThreadScheduledExecutor();
>       executorService.scheduleWithFixedDelay(new Runnable() {
>          @Override
>          public void run() {
>             int size = pool.size();
>             if (size < minIdle) {
>                int sizeToBeAdded = minIdle - size;
>                for (int i = 0; i < sizeToBeAdded; i++) {
>                   pool.add(createProducer());
>                }
>             } else if (size > maxIdle) {
>                int sizeToBeRemoved = size - maxIdle;
>                for (int i = 0; i < sizeToBeRemoved; i++) {
>                   pool.poll();
>                }
>             }
>          }
>       }, validationInterval, validationInterval, TimeUnit.SECONDS);
>    }
>
>    /**
>     * Gets the next free object from the pool. If the pool doesn't contain any
>     * objects, a new object will be created and given to the caller of this
>     * method back.
>     *
>     * @return T borrowed object
>     */
>    public synchronized KafkaProducer<String, String> borrowProducer() {
>       if (pool == null)
>          initialize();
>       KafkaProducer<String, String> object;
>       if ((object = pool.poll()) == null) {
>          object = createProducer();
>       }
>
>       return object;
>    }
>
>    /**
>     * Returns object back to the pool.
>     *
>     *            object to be returned
>     */
>    public void returnProducer(KafkaProducer<String, String> producer) {
>       if (producer == null) {
>          return;
>       }
>       this.pool.offer(producer);
>    }
>
>    /**
>     * Shutdown this pool.
>     */
>    public void shutdown() {
>       if (executorService != null) {
>          KafkaProducer<String, String> producer;
>          while ((producer = pool.poll()) != null) {
>             producer.close();
>          }
>          executorService.shutdown();
>       }
>    }
>
>    /**
>     * Creates a new producer.
>     *
>     * @return T new object
>     */
>    private KafkaProducer<String, String> createProducer() {
>       KafkaProducer<String, String> producer = new KafkaProducer<String,String>(properties);
>       return producer;
>    }
>
>    private void initialize() {
>       pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>();
>
>       for (int i = 0; i < minIdle; i++) {
>          pool.add(createProducer());
>       }
>    }
>
>    public void closeAll() {
>       KafkaProducer<String, String> object;
>       while ((object = pool.poll()) != null) {
>          //object.flush();
>          object.close();
>       }
>    }
> }
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Looks like you create KafkaProducerPool in the driver. So when the task is
> running in the executor, it will always see an new empty KafkaProducerPool
> and create KafkaProducers. But nobody closes these KafkaProducers.
>
> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <ni...@gmail.com>
> wrote:
>
>
> Sorry for not writing the patch number, it's spark 1.6.1.
> The relevant code is here inline.
>
> Please have a look and let me know if there is a resource leak.
> Please also let me know if you need any more details.
>
> Thanks
> Nipun
>
>
> The JavaRDDKafkaWriter code is here inline:
>
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.VoidFunction;
> import scala.Tuple2;
>
> import java.io.Serializable;
> import java.util.Iterator;
>
> public class JavaRDDStringKafkaWriter implements Serializable, VoidFunction<JavaRDD<String>> {
>
>    private static final long serialVersionUID = -865193912367180261L;
>    private final KafkaProducerPool pool;
>    private final String topic;
>    private final Boolean kafkaAsync;
>
>    public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String topic, Boolean kafkaAsync) {
>       this.pool = pool;
>       this.topic = topic;
>       this.kafkaAsync = kafkaAsync;
>    }
>
>    @Override
>    public void call(JavaRDD<String> stringJavaRDD) throws Exception {
>       stringJavaRDD.foreachPartition(new PartitionVoidFunction(
>             new RDDKafkaWriter(pool,kafkaAsync), topic));
>    }
>
>    private class PartitionVoidFunction implements
>          VoidFunction<Iterator<String>> {
>
>       private static final long serialVersionUID = 8726871215617446598L;
>       private final RDDKafkaWriter kafkaWriter;
>       private final String topic;
>
>       public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
>          this.kafkaWriter = kafkaWriter;
>          this.topic = topic;
>       }
>
>       @Override
>       public void call(Iterator<String> iterator) throws Exception {
>          while (iterator.hasNext()) {
>             kafkaWriter.writeToKafka(topic, iterator.next());
>          }
>       }
>    }
> }
>
>
> The RDDKafkaWriter is here:
>
>
> import java.io.Serializable;
> import java.util.concurrent.ExecutionException;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import scala.Tuple2;
>
> public class RDDKafkaWriter implements Serializable {
>
>    private static final long serialVersionUID = 7374381310562055607L;
>    private final KafkaProducerPool pool;
>    private final Boolean async;
>
>    public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>       this.pool = pool;
>       this.async = async;
>
>    }
>
>    public void writeToKafka(String topic, Tuple2<String, String> message) {
>       KafkaProducer<String, String> producer = pool.borrowProducer();
>       ProducerRecord<String, String> record = new ProducerRecord<String, String>(
>             topic, message._1(), message._2());
>       if (async) {
>          producer.send(record);
>       } else {
>          try {
>             producer.send(record).get();
>          } catch (Exception e) {
>             e.printStackTrace();
>          }
>       }
>       pool.returnProducer(producer);
>    }
>
>     public void writeToKafka(String topic, String message) {
>
>         KafkaProducer<String, String> producer = pool.borrowProducer();
>         ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
>
>         if (async) {
>             producer.send(record);
>         } else {
>             try {
>                 producer.send(record).get();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>         }
>         pool.returnProducer(producer);
>     }
>
>
> }
>
>
>
>
>
> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
> resources.
>
> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <ni...@gmail.com>
> wrote:
>
> It is spark 1.6
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Could you provide your Spark version please?
>
> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <ni...@gmail.com>
> wrote:
>
> Hi,
>
> I get a resource leak, where the number of file descriptors in spark
> streaming keeps increasing. We end up with a "too many file open" error
> eventually through an exception caused in:
>
> JAVARDDKafkaWriter, which is writing a spark JavaDStream<String>
>
> The exception is attached inline. Any help will be greatly appreciated.
>
> Thanks
> Nipun
>
> -------------------------------------------
> Time: 1485762530000 ms
> -------------------------------------------
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
> java.io.FileNotFoundException:
> /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181)
> at
> org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
> at
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
> at
> org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46)
> at
> org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:25)
> at
> org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:10)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:228)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException:
> /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
>
>