You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/04/17 20:13:16 UTC

[jira] [Updated] (SAMZA-101) Samza task leaking file descriptors on Kafka exceptions

     [ https://issues.apache.org/jira/browse/SAMZA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Riccomini updated SAMZA-101:
----------------------------------

    Assignee:     (was: Chris Riccomini)

> Samza task leaking file descriptors on Kafka exceptions
> -------------------------------------------------------
>
>                 Key: SAMZA-101
>                 URL: https://issues.apache.org/jira/browse/SAMZA-101
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Alan Li
>
> Initially, my samza task began seeing many UnresolvedAddressExceptions, likely because the kafka cluster went down and the samza task is retrying:
> {noformat}
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating simple consumer and retrying connection
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace for fetchMessages exception.
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:30)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:480)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> Eventually, I began seeing these, which the samza task will never recover from:
> {noformat}
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating simple consumer and retrying connection
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace for fetchMessages exception.
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:97)
> at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:84)
> at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:37)
> at java.nio.channels.SocketChannel.open(SocketChannel.java:105)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.2#6252)