You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tony Wei <to...@gmail.com> on 2018/05/14 03:36:56 UTC

Question about the behavior of TM when it lost the zookeeper client session in HA mode

Hi all,

Recently, my flink job met a problem that caused the job failed and
restarted.

The log is list this screen snapshot



or this

```
2018-05-11 13:21:04,582 WARN
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client
session timed out, have not heard from server in 61054ms for sessionid
0x3054b165fe2006a
2018-05-11 13:21:04,583 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client
session timed out, have not heard from server in 61054ms for sessionid
0x3054b165fe2006a, closing socket connection and attempting reconnect
2018-05-11 13:21:04,683 INFO
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
- State change: SUSPENDED
2018-05-11 13:21:04,686 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.
2018-05-11 13:21:04,689 INFO
org.apache.kafka.clients.producer.KafkaProducer               - Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-11 13:21:04,694 INFO
org.apache.kafka.clients.producer.KafkaProducer               - Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task
                 - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd
-> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched
from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected
before a response was received.
```

Logs showed *`org.apache.kafka.clients.producer.KafkaProducer - Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.`* This timeout
value is *Long.MAX_VALUE*. It happened when someone called
*`producer.close()`*.

And I also saw the log said
*`org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
- Client session timed out, have not heard from server in 61054ms for
sessionid 0x3054b165fe2006a, closing socket connection and attempting
reconnect`*
and *`org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
- Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.`*

I have checked zookeeper and kafka and there was no error during that
period.
I was wondering if TM will stop the tasks when it lost zookeeper client in
HA mode. Since I didn't see any document or mailing thread discuss this,
I'm not sure if this is the reason that made kafka producer closed.
Could someone who know HA well? Or someone know what happened in my job?

My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My
zookeeper cluster version is 3.4.11 with 3 nodes.
The *`high-availability.zookeeper.client.session-timeout`* is default
value: 60000 ms.
The *`maxSessionTimeout`* in zoo.cfg is 40000ms.
I have already change the *maxSessionTimeout* to 120000ms this morning.

This problem happened many many times during the last weekend and made my
kafka log delay grew up. Please help me. Thank you very much!

Best Regards,
Tony Wei

Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

Posted by Ron Crocker <rc...@newrelic.com>.
I just stumbled on this same problem without any associated ZK issues. We had a Kafka broker fail that caused this issue:

2018-07-18 02:48:13,497 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Produce: <output_topic_name> (2/4) (7e7d61b286d90c51bbd20a15796633f2) switched from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

This is the kind of error we should be robust to - the Kafka cluster will (reasonably quickly) recover and give a new broker for a particular partition (in this case, partition #2). Maybe retries should be the default configuration? I believe the client uses the Kafka defaults (acks=0, retries=0), but we typically run with acks=1 (or all) and retries=MAX_INT. Do I need to do anything more than that to get a more robust producer?

Ron

> On May 16, 2018, at 7:45 PM, Tony Wei <to...@gmail.com> wrote:
> 
> Hi Ufuk, Piotr
> 
> Thanks for all of your replies. I knew that jobs are cancelled if the JM looses the connection to ZK, but JM didn't loose connection in my case.
> My job failed because of the exception from KafkaProducer. However, it happened before and after that exception that TM lost ZK connection.
> So, as Piotr said, it looks like an error in Kafka producer and I will pay more attention on it to see if there is something unexpected happens again.
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-15 19:56 GMT+08:00 Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>:
> Hi,
> 
> It looks like there was an error in asynchronous job of sending the records to Kafka. Probably this is a collateral damage of loosing connection to zookeeper. 
> 
> Piotrek
> 
>> On 15 May 2018, at 13:33, Ufuk Celebi <uce@apache.org <ma...@apache.org>> wrote:
>> 
>> Hey Tony,
>> 
>> thanks for the detailed report.
>> 
>> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and recovered when the connection is re-established (and one JM becomes leader again).
>> 
>> - Regarding the KafkaProducer: I'm not sure from the log message whether Flink closes the KafkaProducer because the job is cancelled or because there is a connectivity issue to the Kafka cluster. Including Piotr (cc) in this thread who has worked on the KafkaProducer in the past. If it is a connectivity issue, it might also explain why you lost the connection to ZK.
>> 
>> Glad to hear that everything is back to normal. Keep us updated if something unexpected happens again.
>> 
>> – Ufuk
>> 
>> 
>> On Tue, May 15, 2018 at 6:28 AM, Tony Wei <tony19920430@gmail.com <ma...@gmail.com>> wrote:
>> Hi all,
>> 
>> I restarted the cluster and changed the log level to DEBUG, and raised the parallelism of my streaming job from 32 to 40.
>> However, the problem just disappeared and I don't know why.
>> I will remain these settings for a while. If the error happen again, I will bring more informations back for help. Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-05-14 14:24 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> Hi all,
>> 
>> After I changed the `high-availability.zookeeper.client.session-timeout` and `maxSessionTimeout` to 120000ms, the exception still occurred.
>> 
>> Here is the log snippet. It seems this is nothing to do with zookeeper client timeout, but I still don't know why kafka producer would be closed without any task state changed.
>> 
>> ```
>> 2018-05-14 05:18:53,468 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 82828ms for sessionid 0x305f957eb8d000a
>> 2018-05-14 05:18:53,468 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 82828ms for sessionid 0x305f957eb8d000a, closing socket connection and attempting reconnect
>> 2018-05-14 05:18:53,571 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
>> 2018-05-14 05:18:53,574 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
>> 2018-05-14 05:18:53,850 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
>> 2018-05-14 05:18:53,850 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server XXX.XXX.XXX.XXX:2181
>> 2018-05-14 05:18:53,852 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed
>> 2018-05-14 05:18:53,853 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to XXX.XXX.XXX.XXX:2181, initiating session
>> 2018-05-14 05:18:53,859 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = 0x305f957eb8d000a, negotiated timeout = 120000
>> 2018-05-14 05:18:53,860 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
>> 2018-05-14 05:18:53,860 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>> 2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task                     - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
>> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>> 	at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
>> 	at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
>> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
>> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>> 	at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
>> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>> 	at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
>> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
>> 	at org.apache.flink.streaming.api.operators.co <http://api.operators.co/>.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
>> 	at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
>> 	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> 	at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
>> ```
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-05-14 11:36 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> Hi all,
>> 
>> Recently, my flink job met a problem that caused the job failed and restarted.
>> 
>> The log is list this screen snapshot
>> 
>> <exception.png>
>> 
>> or this
>> 
>> ```
>> 2018-05-11 13:21:04,582 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a
>> 2018-05-11 13:21:04,583 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing socket connection and attempting reconnect
>> 2018-05-11 13:21:04,683 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
>> 2018-05-11 13:21:04,686 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
>> 2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task                     - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
>> ```
>> 
>> Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.` This timeout value is Long.MAX_VALUE. It happened when someone called `producer.close()`.
>> 
>> And I also saw the log said `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing socket connection and attempting reconnect`
>> and `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.`
>> 
>> I have checked zookeeper and kafka and there was no error during that period.
>> I was wondering if TM will stop the tasks when it lost zookeeper client in HA mode. Since I didn't see any document or mailing thread discuss this, I'm not sure if this is the reason that made kafka producer closed.
>> Could someone who know HA well? Or someone know what happened in my job?
>> 
>> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My zookeeper cluster version is 3.4.11 with 3 nodes.
>> The `high-availability.zookeeper.client.session-timeout` is default value: 60000 ms.
>> The `maxSessionTimeout` in zoo.cfg is 40000ms.
>> I have already change the maxSessionTimeout to 120000ms this morning.
>> 
>> This problem happened many many times during the last weekend and made my kafka log delay grew up. Please help me. Thank you very much!
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 


Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

Posted by Tony Wei <to...@gmail.com>.
Hi Ufuk, Piotr

Thanks for all of your replies. I knew that jobs are cancelled if the JM
looses the connection to ZK, but JM didn't loose connection in my case.
My job failed because of the exception from KafkaProducer. However, it
happened before and after that exception that TM lost ZK connection.
So, as Piotr said, it looks like an error in Kafka producer and I will pay
more attention on it to see if there is something unexpected happens again.

Best Regards,
Tony Wei

2018-05-15 19:56 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:

> Hi,
>
> It looks like there was an error in asynchronous job of sending the
> records to Kafka. Probably this is a collateral damage of loosing
> connection to zookeeper.
>
> Piotrek
>
> On 15 May 2018, at 13:33, Ufuk Celebi <uc...@apache.org> wrote:
>
> Hey Tony,
>
> thanks for the detailed report.
>
> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK
> and recovered when the connection is re-established (and one JM becomes
> leader again).
>
> - Regarding the KafkaProducer: I'm not sure from the log message whether
> Flink closes the KafkaProducer because the job is cancelled or because
> there is a connectivity issue to the Kafka cluster. Including Piotr (cc) in
> this thread who has worked on the KafkaProducer in the past. If it is a
> connectivity issue, it might also explain why you lost the connection to ZK.
>
> Glad to hear that everything is back to normal. Keep us updated if
> something unexpected happens again.
>
> – Ufuk
>
>
> On Tue, May 15, 2018 at 6:28 AM, Tony Wei <to...@gmail.com> wrote:
>
>> Hi all,
>>
>> I restarted the cluster and changed the log level to DEBUG, and raised
>> the parallelism of my streaming job from 32 to 40.
>> However, the problem just disappeared and I don't know why.
>> I will remain these settings for a while. If the error happen again, I
>> will bring more informations back for help. Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-05-14 14:24 GMT+08:00 Tony Wei <to...@gmail.com>:
>>
>>> Hi all,
>>>
>>> After I changed the *`h*
>>> *igh-availability.zookeeper.client.session-timeout`* and
>>> *`maxSessionTimeout`* to 120000ms, the exception still occurred.
>>>
>>> Here is the log snippet. It seems this is nothing to do with zookeeper
>>> client timeout, but I still don't know why kafka producer would be closed
>>> without any task state changed.
>>>
>>> ```
>>> 2018-05-14 05:18:53,468 WARN  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>>> not heard from server in 82828ms for sessionid 0x305f957eb8d000a
>>> 2018-05-14 05:18:53,468 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>>> not heard from server in 82828ms for sessionid 0x305f957eb8d000a, closing
>>> socket connection and attempting reconnect
>>> 2018-05-14 05:18:53,571 INFO  org.apache.flink.shaded.curato
>>> r.org.apache.curator.framework.state.ConnectionStateManager  - State
>>> change: SUSPENDED
>>> 2018-05-14 05:18:53,574 WARN  org.apache.flink.runtime.leade
>>> rretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper
>>> suspended. Can no longer retrieve the leader from ZooKeeper.
>>> 2018-05-14 05:18:53,850 WARN  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - SASL configuration failed:
>>> javax.security.auth.login.LoginException: No JAAS configuration section
>>> named 'Client' was found in specified JAAS configuration file:
>>> '/mnt/jaas-466390940757021791.conf'. Will continue connection to
>>> Zookeeper server without SASL authentication, if Zookeeper server allows it.
>>> 2018-05-14 05:18:53,850 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Opening socket connection to
>>> server XXX.XXX.XXX.XXX:2181
>>> 2018-05-14 05:18:53,852 ERROR org.apache.flink.shaded.curato
>>> r.org.apache.curator.ConnectionState  - Authentication failed
>>> 2018-05-14 05:18:53,853 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Socket connection established to
>>> XXX.XXX.XXX.XXX:2181, initiating session
>>> 2018-05-14 05:18:53,859 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Session establishment complete
>>> on server XXX.XXX.XXX.XXX:2181, sessionid = 0x305f957eb8d000a, negotiated
>>> timeout = 120000
>>> 2018-05-14 05:18:53,860 INFO  org.apache.flink.shaded.curato
>>> r.org.apache.curator.framework.state.ConnectionStateManager  - State
>>> change: RECONNECTED
>>> 2018-05-14 05:18:53,860 INFO  org.apache.flink.runtime.leade
>>> rretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper
>>> was reconnected. Leader retrieval can be restarted.
>>> 2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>>              - Closing the Kafka producer with timeoutMillis =
>>> 9223372036854775807 ms.
>>> 2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>>              - Closing the Kafka producer with timeoutMillis =
>>> 9223372036854775807 ms.
>>> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                    - match-rule -> (get-ordinary -> Sink: kafka-sink,
>>> get-cd -> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb)
>>> switched from RUNNING to FAILED.
>>> java.lang.Exception: Failed to send data to Kafka: The server
>>> disconnected before a response was received.
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> erBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> er010.invoke(FlinkKafkaProducer010.java:288)
>>> at org.apache.flink.streaming.api.operators.StreamSink.processE
>>> lement(StreamSink.java:56)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>>> ingOutput.pushToOperator(OperatorChain.java:464)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>>> ingOutput.collect(OperatorChain.java:441)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>>> ingOutput.collect(OperatorChain.java:415)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>>> ement(StreamMap.java:41)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>>> ingOutput.pushToOperator(OperatorChain.java:464)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>>> ingOutput.collect(OperatorChain.java:441)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>>> ingOutput.collect(OperatorChain.java:415)
>>> at org.apache.flink.streaming.api.collector.selector.CopyingDir
>>> ectedOutput.collect(CopyingDirectedOutput.java:62)
>>> at org.apache.flink.streaming.api.collector.selector.CopyingDir
>>> ectedOutput.collect(CopyingDirectedOutput.java:34)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto
>>> r.collect(TimestampedCollector.java:51)
>>> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$ano
>>> nfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
>>> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$ano
>>> nfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
>>> at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.app
>>> ly(MapLike.scala:245)
>>> at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.app
>>> ly(MapLike.scala:245)
>>> at scala.collection.TraversableLike$WithFilter$$anonfun$foreach
>>> $1.apply(TraversableLike.scala:733)
>>> at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
>>> at scala.collection.TraversableLike$WithFilter.foreach(Traversa
>>> bleLike.scala:732)
>>> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>>> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flat
>>> Map1(MatchRuleOperator.scala:38)
>>> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flat
>>> Map1(MatchRuleOperator.scala:14)
>>> at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.
>>> processElement1(CoStreamFlatMap.java:53)
>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcesso
>>> r.processInput(StreamTwoInputProcessor.java:243)
>>> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.
>>> run(TwoInputStreamTask.java:91)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:264)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.kafka.common.errors.NetworkException: The server
>>> disconnected before a response was received.
>>> ```
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-05-14 11:36 GMT+08:00 Tony Wei <to...@gmail.com>:
>>>
>>>> Hi all,
>>>>
>>>> Recently, my flink job met a problem that caused the job failed and
>>>> restarted.
>>>>
>>>> The log is list this screen snapshot
>>>>
>>>> <exception.png>
>>>>
>>>> or this
>>>>
>>>> ```
>>>> 2018-05-11 13:21:04,582 WARN  org.apache.flink.shaded.zookee
>>>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>>>> not heard from server in 61054ms for sessionid 0x3054b165fe2006a
>>>> 2018-05-11 13:21:04,583 INFO  org.apache.flink.shaded.zookee
>>>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>>>> not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing
>>>> socket connection and attempting reconnect
>>>> 2018-05-11 13:21:04,683 INFO  org.apache.flink.shaded.curato
>>>> r.org.apache.curator.framework.state.ConnectionStateManager  - State
>>>> change: SUSPENDED
>>>> 2018-05-11 13:21:04,686 WARN  org.apache.flink.runtime.leade
>>>> rretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper
>>>> suspended. Can no longer retrieve the leader from ZooKeeper.
>>>> 2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>>>              - Closing the Kafka producer with timeoutMillis =
>>>> 9223372036854775807 ms.
>>>> 2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>>>              - Closing the Kafka producer with timeoutMillis =
>>>> 9223372036854775807 ms.
>>>> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task
>>>>                    - match-rule -> (get-ordinary -> Sink: kafka-sink,
>>>> get-cd -> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403)
>>>> switched from RUNNING to FAILED.
>>>> java.lang.Exception: Failed to send data to Kafka: The server
>>>> disconnected before a response was received.
>>>> ```
>>>>
>>>> Logs showed *`org.apache.kafka.clients.producer.KafkaProducer -
>>>> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.`*
>>>> This timeout value is *Long.MAX_VALUE*. It happened when someone
>>>> called *`producer.close()`*.
>>>>
>>>> And I also saw the log said *`org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
>>>> - Client session timed out, have not heard from server in 61054ms for
>>>> sessionid 0x3054b165fe2006a, closing socket connection and attempting
>>>> reconnect`*
>>>> and *`org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>>> - Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>>>> ZooKeeper.`*
>>>>
>>>> I have checked zookeeper and kafka and there was no error during that
>>>> period.
>>>> I was wondering if TM will stop the tasks when it lost zookeeper client
>>>> in HA mode. Since I didn't see any document or mailing thread discuss this,
>>>> I'm not sure if this is the reason that made kafka producer closed.
>>>> Could someone who know HA well? Or someone know what happened in my job?
>>>>
>>>> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My
>>>> zookeeper cluster version is 3.4.11 with 3 nodes.
>>>> The *`high-availability.zookeeper.client.session-timeout`* is default
>>>> value: 60000 ms.
>>>> The *`maxSessionTimeout`* in zoo.cfg is 40000ms.
>>>> I have already change the *maxSessionTimeout* to 120000ms this morning.
>>>>
>>>> This problem happened many many times during the last weekend and made
>>>> my kafka log delay grew up. Please help me. Thank you very much!
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>>
>>>>
>>>>
>>>
>>
>
>

Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

It looks like there was an error in asynchronous job of sending the records to Kafka. Probably this is a collateral damage of loosing connection to zookeeper. 

Piotrek

> On 15 May 2018, at 13:33, Ufuk Celebi <uc...@apache.org> wrote:
> 
> Hey Tony,
> 
> thanks for the detailed report.
> 
> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and recovered when the connection is re-established (and one JM becomes leader again).
> 
> - Regarding the KafkaProducer: I'm not sure from the log message whether Flink closes the KafkaProducer because the job is cancelled or because there is a connectivity issue to the Kafka cluster. Including Piotr (cc) in this thread who has worked on the KafkaProducer in the past. If it is a connectivity issue, it might also explain why you lost the connection to ZK.
> 
> Glad to hear that everything is back to normal. Keep us updated if something unexpected happens again.
> 
> – Ufuk
> 
> 
> On Tue, May 15, 2018 at 6:28 AM, Tony Wei <tony19920430@gmail.com <ma...@gmail.com>> wrote:
> Hi all,
> 
> I restarted the cluster and changed the log level to DEBUG, and raised the parallelism of my streaming job from 32 to 40.
> However, the problem just disappeared and I don't know why.
> I will remain these settings for a while. If the error happen again, I will bring more informations back for help. Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-14 14:24 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
> Hi all,
> 
> After I changed the `high-availability.zookeeper.client.session-timeout` and `maxSessionTimeout` to 120000ms, the exception still occurred.
> 
> Here is the log snippet. It seems this is nothing to do with zookeeper client timeout, but I still don't know why kafka producer would be closed without any task state changed.
> 
> ```
> 2018-05-14 05:18:53,468 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 82828ms for sessionid 0x305f957eb8d000a
> 2018-05-14 05:18:53,468 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 82828ms for sessionid 0x305f957eb8d000a, closing socket connection and attempting reconnect
> 2018-05-14 05:18:53,571 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
> 2018-05-14 05:18:53,574 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
> 2018-05-14 05:18:53,850 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
> 2018-05-14 05:18:53,850 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server XXX.XXX.XXX.XXX:2181
> 2018-05-14 05:18:53,852 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed
> 2018-05-14 05:18:53,853 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to XXX.XXX.XXX.XXX:2181, initiating session
> 2018-05-14 05:18:53,859 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = 0x305f957eb8d000a, negotiated timeout = 120000
> 2018-05-14 05:18:53,860 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
> 2018-05-14 05:18:53,860 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
> 2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task                     - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from RUNNING to FAILED.
> java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
> 	at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
> 	at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> 	at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> 	at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
> 	at org.apache.flink.streaming.api.operators.co <http://api.operators.co/>.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
> 	at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
> 	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
> ```
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-14 11:36 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
> Hi all,
> 
> Recently, my flink job met a problem that caused the job failed and restarted.
> 
> The log is list this screen snapshot
> 
> <exception.png>
> 
> or this
> 
> ```
> 2018-05-11 13:21:04,582 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a
> 2018-05-11 13:21:04,583 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing socket connection and attempting reconnect
> 2018-05-11 13:21:04,683 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
> 2018-05-11 13:21:04,686 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
> 2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task                     - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from RUNNING to FAILED.
> java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
> ```
> 
> Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.` This timeout value is Long.MAX_VALUE. It happened when someone called `producer.close()`.
> 
> And I also saw the log said `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing socket connection and attempting reconnect`
> and `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.`
> 
> I have checked zookeeper and kafka and there was no error during that period.
> I was wondering if TM will stop the tasks when it lost zookeeper client in HA mode. Since I didn't see any document or mailing thread discuss this, I'm not sure if this is the reason that made kafka producer closed.
> Could someone who know HA well? Or someone know what happened in my job?
> 
> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My zookeeper cluster version is 3.4.11 with 3 nodes.
> The `high-availability.zookeeper.client.session-timeout` is default value: 60000 ms.
> The `maxSessionTimeout` in zoo.cfg is 40000ms.
> I have already change the maxSessionTimeout to 120000ms this morning.
> 
> This problem happened many many times during the last weekend and made my kafka log delay grew up. Please help me. Thank you very much!
> 
> Best Regards,
> Tony Wei
> 
> 
> 
> 
> 
> 


Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Tony,

thanks for the detailed report.

- In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK
and recovered when the connection is re-established (and one JM becomes
leader again).

- Regarding the KafkaProducer: I'm not sure from the log message whether
Flink closes the KafkaProducer because the job is cancelled or because
there is a connectivity issue to the Kafka cluster. Including Piotr (cc) in
this thread who has worked on the KafkaProducer in the past. If it is a
connectivity issue, it might also explain why you lost the connection to ZK.

Glad to hear that everything is back to normal. Keep us updated if
something unexpected happens again.

– Ufuk


On Tue, May 15, 2018 at 6:28 AM, Tony Wei <to...@gmail.com> wrote:

> Hi all,
>
> I restarted the cluster and changed the log level to DEBUG, and raised the
> parallelism of my streaming job from 32 to 40.
> However, the problem just disappeared and I don't know why.
> I will remain these settings for a while. If the error happen again, I
> will bring more informations back for help. Thank you.
>
> Best Regards,
> Tony Wei
>
> 2018-05-14 14:24 GMT+08:00 Tony Wei <to...@gmail.com>:
>
>> Hi all,
>>
>> After I changed the *`h*
>> *igh-availability.zookeeper.client.session-timeout`* and
>> *`maxSessionTimeout`* to 120000ms, the exception still occurred.
>>
>> Here is the log snippet. It seems this is nothing to do with zookeeper
>> client timeout, but I still don't know why kafka producer would be closed
>> without any task state changed.
>>
>> ```
>> 2018-05-14 05:18:53,468 WARN  org.apache.flink.shaded.zookee
>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>> not heard from server in 82828ms for sessionid 0x305f957eb8d000a
>> 2018-05-14 05:18:53,468 INFO  org.apache.flink.shaded.zookee
>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>> not heard from server in 82828ms for sessionid 0x305f957eb8d000a, closing
>> socket connection and attempting reconnect
>> 2018-05-14 05:18:53,571 INFO  org.apache.flink.shaded.curato
>> r.org.apache.curator.framework.state.ConnectionStateManager  - State
>> change: SUSPENDED
>> 2018-05-14 05:18:53,574 WARN  org.apache.flink.runtime.leade
>> rretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper
>> suspended. Can no longer retrieve the leader from ZooKeeper.
>> 2018-05-14 05:18:53,850 WARN  org.apache.flink.shaded.zookee
>> per.org.apache.zookeeper.ClientCnxn  - SASL configuration failed:
>> javax.security.auth.login.LoginException: No JAAS configuration section
>> named 'Client' was found in specified JAAS configuration file:
>> '/mnt/jaas-466390940757021791.conf'. Will continue connection to
>> Zookeeper server without SASL authentication, if Zookeeper server allows it.
>> 2018-05-14 05:18:53,850 INFO  org.apache.flink.shaded.zookee
>> per.org.apache.zookeeper.ClientCnxn  - Opening socket connection to
>> server XXX.XXX.XXX.XXX:2181
>> 2018-05-14 05:18:53,852 ERROR org.apache.flink.shaded.curato
>> r.org.apache.curator.ConnectionState  - Authentication failed
>> 2018-05-14 05:18:53,853 INFO  org.apache.flink.shaded.zookee
>> per.org.apache.zookeeper.ClientCnxn  - Socket connection established to
>> XXX.XXX.XXX.XXX:2181, initiating session
>> 2018-05-14 05:18:53,859 INFO  org.apache.flink.shaded.zookee
>> per.org.apache.zookeeper.ClientCnxn  - Session establishment complete on
>> server XXX.XXX.XXX.XXX:2181, sessionid = 0x305f957eb8d000a, negotiated
>> timeout = 120000
>> 2018-05-14 05:18:53,860 INFO  org.apache.flink.shaded.curato
>> r.org.apache.curator.framework.state.ConnectionStateManager  - State
>> change: RECONNECTED
>> 2018-05-14 05:18:53,860 INFO  org.apache.flink.runtime.leade
>> rretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper
>> was reconnected. Leader retrieval can be restarted.
>> 2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>              - Closing the Kafka producer with timeoutMillis =
>> 9223372036854775807 ms.
>> 2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>              - Closing the Kafka producer with timeoutMillis =
>> 9223372036854775807 ms.
>> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task
>>                    - match-rule -> (get-ordinary -> Sink: kafka-sink,
>> get-cd -> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb)
>> switched from RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server
>> disconnected before a response was received.
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>> erBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>> er010.invoke(FlinkKafkaProducer010.java:288)
>> at org.apache.flink.streaming.api.operators.StreamSink.processE
>> lement(StreamSink.java:56)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.pushToOperator(OperatorChain.java:464)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.collect(OperatorChain.java:441)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.collect(OperatorChain.java:415)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>> ement(StreamMap.java:41)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.pushToOperator(OperatorChain.java:464)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.collect(OperatorChain.java:441)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.collect(OperatorChain.java:415)
>> at org.apache.flink.streaming.api.collector.selector.CopyingDir
>> ectedOutput.collect(CopyingDirectedOutput.java:62)
>> at org.apache.flink.streaming.api.collector.selector.CopyingDir
>> ectedOutput.collect(CopyingDirectedOutput.java:34)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>> at org.apache.flink.streaming.api.operators.TimestampedCollecto
>> r.collect(TimestampedCollector.java:51)
>> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$ano
>> nfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
>> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$ano
>> nfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
>> at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.
>> apply(MapLike.scala:245)
>> at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.
>> apply(MapLike.scala:245)
>> at scala.collection.TraversableLike$WithFilter$$anonfun$
>> foreach$1.apply(TraversableLike.scala:733)
>> at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
>> at scala.collection.TraversableLike$WithFilter.foreach(
>> TraversableLike.scala:732)
>> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flat
>> Map1(MatchRuleOperator.scala:38)
>> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flat
>> Map1(MatchRuleOperator.scala:14)
>> at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.
>> processElement1(CoStreamFlatMap.java:53)
>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcesso
>> r.processInput(StreamTwoInputProcessor.java:243)
>> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.
>> run(TwoInputStreamTask.java:91)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.NetworkException: The server
>> disconnected before a response was received.
>> ```
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-05-14 11:36 GMT+08:00 Tony Wei <to...@gmail.com>:
>>
>>> Hi all,
>>>
>>> Recently, my flink job met a problem that caused the job failed and
>>> restarted.
>>>
>>> The log is list this screen snapshot
>>>
>>>
>>>
>>> or this
>>>
>>> ```
>>> 2018-05-11 13:21:04,582 WARN  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>>> not heard from server in 61054ms for sessionid 0x3054b165fe2006a
>>> 2018-05-11 13:21:04,583 INFO  org.apache.flink.shaded.zookee
>>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>>> not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing
>>> socket connection and attempting reconnect
>>> 2018-05-11 13:21:04,683 INFO  org.apache.flink.shaded.curato
>>> r.org.apache.curator.framework.state.ConnectionStateManager  - State
>>> change: SUSPENDED
>>> 2018-05-11 13:21:04,686 WARN  org.apache.flink.runtime.leade
>>> rretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper
>>> suspended. Can no longer retrieve the leader from ZooKeeper.
>>> 2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>>              - Closing the Kafka producer with timeoutMillis =
>>> 9223372036854775807 ms.
>>> 2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>>              - Closing the Kafka producer with timeoutMillis =
>>> 9223372036854775807 ms.
>>> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                    - match-rule -> (get-ordinary -> Sink: kafka-sink,
>>> get-cd -> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403)
>>> switched from RUNNING to FAILED.
>>> java.lang.Exception: Failed to send data to Kafka: The server
>>> disconnected before a response was received.
>>> ```
>>>
>>> Logs showed *`org.apache.kafka.clients.producer.KafkaProducer - Closing
>>> the Kafka producer with timeoutMillis = 9223372036854775807 ms.`* This
>>> timeout value is *Long.MAX_VALUE*. It happened when someone called
>>> *`producer.close()`*.
>>>
>>> And I also saw the log said *`org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
>>> - Client session timed out, have not heard from server in 61054ms for
>>> sessionid 0x3054b165fe2006a, closing socket connection and attempting
>>> reconnect`*
>>> and *`org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>> - Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>>> ZooKeeper.`*
>>>
>>> I have checked zookeeper and kafka and there was no error during that
>>> period.
>>> I was wondering if TM will stop the tasks when it lost zookeeper client
>>> in HA mode. Since I didn't see any document or mailing thread discuss this,
>>> I'm not sure if this is the reason that made kafka producer closed.
>>> Could someone who know HA well? Or someone know what happened in my job?
>>>
>>> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My
>>> zookeeper cluster version is 3.4.11 with 3 nodes.
>>> The *`high-availability.zookeeper.client.session-timeout`* is default
>>> value: 60000 ms.
>>> The *`maxSessionTimeout`* in zoo.cfg is 40000ms.
>>> I have already change the *maxSessionTimeout* to 120000ms this morning.
>>>
>>> This problem happened many many times during the last weekend and made
>>> my kafka log delay grew up. Please help me. Thank you very much!
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>>
>>>
>>>
>>
>

Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

Posted by Tony Wei <to...@gmail.com>.
Hi all,

I restarted the cluster and changed the log level to DEBUG, and raised the
parallelism of my streaming job from 32 to 40.
However, the problem just disappeared and I don't know why.
I will remain these settings for a while. If the error happen again, I will
bring more informations back for help. Thank you.

Best Regards,
Tony Wei

2018-05-14 14:24 GMT+08:00 Tony Wei <to...@gmail.com>:

> Hi all,
>
> After I changed the *`h*
> *igh-availability.zookeeper.client.session-timeout`* and
> *`maxSessionTimeout`* to 120000ms, the exception still occurred.
>
> Here is the log snippet. It seems this is nothing to do with zookeeper
> client timeout, but I still don't know why kafka producer would be closed
> without any task state changed.
>
> ```
> 2018-05-14 05:18:53,468 WARN  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out,
> have not heard from server in 82828ms for sessionid 0x305f957eb8d000a
> 2018-05-14 05:18:53,468 INFO  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out,
> have not heard from server in 82828ms for sessionid 0x305f957eb8d000a,
> closing socket connection and attempting reconnect
> 2018-05-14 05:18:53,571 INFO  org.apache.flink.shaded.
> curator.org.apache.curator.framework.state.ConnectionStateManager  -
> State change: SUSPENDED
> 2018-05-14 05:18:53,574 WARN  org.apache.flink.runtime.leaderretrieval.
> ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can
> no longer retrieve the leader from ZooKeeper.
> 2018-05-14 05:18:53,850 WARN  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - SASL configuration failed:
> javax.security.auth.login.LoginException: No JAAS configuration section
> named 'Client' was found in specified JAAS configuration file:
> '/mnt/jaas-466390940757021791.conf'. Will continue connection to
> Zookeeper server without SASL authentication, if Zookeeper server allows it.
> 2018-05-14 05:18:53,850 INFO  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to
> server XXX.XXX.XXX.XXX:2181
> 2018-05-14 05:18:53,852 ERROR org.apache.flink.shaded.
> curator.org.apache.curator.ConnectionState  - Authentication failed
> 2018-05-14 05:18:53,853 INFO  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection
> established to XXX.XXX.XXX.XXX:2181, initiating session
> 2018-05-14 05:18:53,859 INFO  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment
> complete on server XXX.XXX.XXX.XXX:2181, sessionid = 0x305f957eb8d000a,
> negotiated timeout = 120000
> 2018-05-14 05:18:53,860 INFO  org.apache.flink.shaded.
> curator.org.apache.curator.framework.state.ConnectionStateManager  -
> State change: RECONNECTED
> 2018-05-14 05:18:53,860 INFO  org.apache.flink.runtime.leaderretrieval.
> ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was
> reconnected. Leader retrieval can be restarted.
> 2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer
>              - Closing the Kafka producer with timeoutMillis =
> 9223372036854775807 ms.
> 2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer
>              - Closing the Kafka producer with timeoutMillis =
> 9223372036854775807 ms.
> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - match-rule -> (get-ordinary -> Sink: kafka-sink,
> get-cd -> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb)
> switched from RUNNING to FAILED.
> java.lang.Exception: Failed to send data to Kafka: The server disconnected
> before a response was received.
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
> checkErroneous(FlinkKafkaProducerBase.java:373)
> at org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
> at org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.java:56)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.pushToOperator(OperatorChain.java:464)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:441)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:415)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:831)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:809)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.pushToOperator(OperatorChain.java:464)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:441)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:415)
> at org.apache.flink.streaming.api.collector.selector.
> CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
> at org.apache.flink.streaming.api.collector.selector.
> CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:831)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:809)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$
> anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$
> anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
> at scala.collection.MapLike$MappedValues$$anonfun$foreach$
> 3.apply(MapLike.scala:245)
> at scala.collection.MapLike$MappedValues$$anonfun$foreach$
> 3.apply(MapLike.scala:245)
> at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
> TraversableLike.scala:733)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
> at scala.collection.TraversableLike$WithFilter.
> foreach(TraversableLike.scala:732)
> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.
> flatMap1(MatchRuleOperator.scala:38)
> at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.
> flatMap1(MatchRuleOperator.scala:14)
> at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.
> processElement1(CoStreamFlatMap.java:53)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.
> processInput(StreamTwoInputProcessor.java:243)
> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(
> TwoInputStreamTask.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.NetworkException: The server
> disconnected before a response was received.
> ```
>
> Best Regards,
> Tony Wei
>
> 2018-05-14 11:36 GMT+08:00 Tony Wei <to...@gmail.com>:
>
>> Hi all,
>>
>> Recently, my flink job met a problem that caused the job failed and
>> restarted.
>>
>> The log is list this screen snapshot
>>
>>
>>
>> or this
>>
>> ```
>> 2018-05-11 13:21:04,582 WARN  org.apache.flink.shaded.zookee
>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>> not heard from server in 61054ms for sessionid 0x3054b165fe2006a
>> 2018-05-11 13:21:04,583 INFO  org.apache.flink.shaded.zookee
>> per.org.apache.zookeeper.ClientCnxn  - Client session timed out, have
>> not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing
>> socket connection and attempting reconnect
>> 2018-05-11 13:21:04,683 INFO  org.apache.flink.shaded.curato
>> r.org.apache.curator.framework.state.ConnectionStateManager  - State
>> change: SUSPENDED
>> 2018-05-11 13:21:04,686 WARN  org.apache.flink.runtime.leade
>> rretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper
>> suspended. Can no longer retrieve the leader from ZooKeeper.
>> 2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>              - Closing the Kafka producer with timeoutMillis =
>> 9223372036854775807 ms.
>> 2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>              - Closing the Kafka producer with timeoutMillis =
>> 9223372036854775807 ms.
>> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task
>>                    - match-rule -> (get-ordinary -> Sink: kafka-sink,
>> get-cd -> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403)
>> switched from RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server
>> disconnected before a response was received.
>> ```
>>
>> Logs showed *`org.apache.kafka.clients.producer.KafkaProducer - Closing
>> the Kafka producer with timeoutMillis = 9223372036854775807 ms.`* This
>> timeout value is *Long.MAX_VALUE*. It happened when someone called
>> *`producer.close()`*.
>>
>> And I also saw the log said *`org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
>> - Client session timed out, have not heard from server in 61054ms for
>> sessionid 0x3054b165fe2006a, closing socket connection and attempting
>> reconnect`*
>> and *`org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>> - Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>> ZooKeeper.`*
>>
>> I have checked zookeeper and kafka and there was no error during that
>> period.
>> I was wondering if TM will stop the tasks when it lost zookeeper client
>> in HA mode. Since I didn't see any document or mailing thread discuss this,
>> I'm not sure if this is the reason that made kafka producer closed.
>> Could someone who know HA well? Or someone know what happened in my job?
>>
>> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My
>> zookeeper cluster version is 3.4.11 with 3 nodes.
>> The *`high-availability.zookeeper.client.session-timeout`* is default
>> value: 60000 ms.
>> The *`maxSessionTimeout`* in zoo.cfg is 40000ms.
>> I have already change the *maxSessionTimeout* to 120000ms this morning.
>>
>> This problem happened many many times during the last weekend and made my
>> kafka log delay grew up. Please help me. Thank you very much!
>>
>> Best Regards,
>> Tony Wei
>>
>>
>>
>>
>

Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

Posted by Tony Wei <to...@gmail.com>.
Hi all,

After I changed the *`h**igh-availability.zookeeper.client.session-timeout`*
 and *`maxSessionTimeout`* to 120000ms, the exception still occurred.

Here is the log snippet. It seems this is nothing to do with zookeeper
client timeout, but I still don't know why kafka producer would be closed
without any task state changed.

```
2018-05-14 05:18:53,468 WARN
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client
session timed out, have not heard from server in 82828ms for sessionid
0x305f957eb8d000a
2018-05-14 05:18:53,468 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client
session timed out, have not heard from server in 82828ms for sessionid
0x305f957eb8d000a, closing socket connection and attempting reconnect
2018-05-14 05:18:53,571 INFO
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
- State change: SUSPENDED
2018-05-14 05:18:53,574 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.
2018-05-14 05:18:53,850 WARN
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
configuration failed: javax.security.auth.login.LoginException: No JAAS
configuration section named 'Client' was found in specified JAAS
configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue
connection to Zookeeper server without SASL authentication, if Zookeeper
server allows it.
2018-05-14 05:18:53,850 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
Opening socket connection to server XXX.XXX.XXX.XXX:2181
2018-05-14 05:18:53,852 ERROR
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
Authentication failed
2018-05-14 05:18:53,853 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket
connection established to XXX.XXX.XXX.XXX:2181, initiating session
2018-05-14 05:18:53,859 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
Session establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid =
0x305f957eb8d000a, negotiated timeout = 120000
2018-05-14 05:18:53,860 INFO
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
- State change: RECONNECTED
2018-05-14 05:18:53,860 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2018-05-14 05:28:54,781 INFO
org.apache.kafka.clients.producer.KafkaProducer               - Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-14 05:28:54,829 INFO
org.apache.kafka.clients.producer.KafkaProducer               - Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task
                 - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd
-> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched
from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected
before a response was received.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
at
org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
at
org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
at
com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at
com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
at
com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
at
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server
disconnected before a response was received.
```

Best Regards,
Tony Wei

2018-05-14 11:36 GMT+08:00 Tony Wei <to...@gmail.com>:

> Hi all,
>
> Recently, my flink job met a problem that caused the job failed and
> restarted.
>
> The log is list this screen snapshot
>
>
>
> or this
>
> ```
> 2018-05-11 13:21:04,582 WARN  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out,
> have not heard from server in 61054ms for sessionid 0x3054b165fe2006a
> 2018-05-11 13:21:04,583 INFO  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out,
> have not heard from server in 61054ms for sessionid 0x3054b165fe2006a,
> closing socket connection and attempting reconnect
> 2018-05-11 13:21:04,683 INFO  org.apache.flink.shaded.
> curator.org.apache.curator.framework.state.ConnectionStateManager  -
> State change: SUSPENDED
> 2018-05-11 13:21:04,686 WARN  org.apache.flink.runtime.leaderretrieval.
> ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can
> no longer retrieve the leader from ZooKeeper.
> 2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer
>              - Closing the Kafka producer with timeoutMillis =
> 9223372036854775807 ms.
> 2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer
>              - Closing the Kafka producer with timeoutMillis =
> 9223372036854775807 ms.
> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - match-rule -> (get-ordinary -> Sink: kafka-sink,
> get-cd -> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403)
> switched from RUNNING to FAILED.
> java.lang.Exception: Failed to send data to Kafka: The server disconnected
> before a response was received.
> ```
>
> Logs showed *`org.apache.kafka.clients.producer.KafkaProducer - Closing
> the Kafka producer with timeoutMillis = 9223372036854775807 ms.`* This
> timeout value is *Long.MAX_VALUE*. It happened when someone called
> *`producer.close()`*.
>
> And I also saw the log said *`org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
> - Client session timed out, have not heard from server in 61054ms for
> sessionid 0x3054b165fe2006a, closing socket connection and attempting
> reconnect`*
> and *`org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
> - Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.`*
>
> I have checked zookeeper and kafka and there was no error during that
> period.
> I was wondering if TM will stop the tasks when it lost zookeeper client in
> HA mode. Since I didn't see any document or mailing thread discuss this,
> I'm not sure if this is the reason that made kafka producer closed.
> Could someone who know HA well? Or someone know what happened in my job?
>
> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My
> zookeeper cluster version is 3.4.11 with 3 nodes.
> The *`high-availability.zookeeper.client.session-timeout`* is default
> value: 60000 ms.
> The *`maxSessionTimeout`* in zoo.cfg is 40000ms.
> I have already change the *maxSessionTimeout* to 120000ms this morning.
>
> This problem happened many many times during the last weekend and made my
> kafka log delay grew up. Please help me. Thank you very much!
>
> Best Regards,
> Tony Wei
>
>
>
>