You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Reinhard Sell (Jira)" <ji...@apache.org> on 2022/01/05 09:56:00 UTC

[jira] [Commented] (NIFI-9506) Nifi reconnects with websocket server each second

    [ https://issues.apache.org/jira/browse/NIFI-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469162#comment-17469162 ] 

Reinhard Sell commented on NIFI-9506:
-------------------------------------

We (Fabian and I) are using the ConnectWebSocket processor with a JettyWebSocketClient service to connect to an external Websocket server (as in the diagram above). And we definitely experience a different behaviour in v1.13.2 compared to v1.15.2.
 
We analyzed the issue a bit more:

The ConnectWebSocket processor in v1.15.2 acccepts an incoming flowfile connection. The attributes of the incoming flowfile can be used to fine tune the websocket connection (URL, http headers). So it's obviously necessary to open a new connection for each incoming flowfile - it's easy to reproduce this.

However, we are using the  ConnectWebSocket *without* incoming flowfiles - and with a default scheduling time of "0 sec".

In v1.13.2 the ConnectWebSocket processor would then open a single connection, when it's started. And reconnect, when this connection broke down, e.g. because of a server restart.

In v1.15.2 however, new connections are created as defined by the "Run Schedule" time in the ConnectWebSocket processor. If that time is set to "10 sec", a new connection is created every 10 seconds. For the default scheduling time of "0 sec" this seems to result in an effective frequency of 1 per second. And this is what we observe in our setup.

So it's probably easy to "fix" our issue by setting the "Run schedule" to a  very high value. We'll give this a try and test it. (t seems that broken connections are still being recreated within the "Session Maintance Interval" as defined for the service.)

The question is: Is this really the intended process for the ConnectWebSocket processor, when operated without incoming flowfiles? And if so: How can "old" connections be cleaned up and be removed? They seem to stay open and will probably consume resources - at least our server says so. 

Or are we using the  ConnectWebSocket processor (as client) in a wrong way? Is it somehow possible to *not* create new connections, when it's not really necessary?

> Nifi reconnects with websocket server each second
> -------------------------------------------------
>
>                 Key: NIFI-9506
>                 URL: https://issues.apache.org/jira/browse/NIFI-9506
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.15.1
>         Environment: official nifi container (https://github.com/apache/nifi/blob/main/nifi-docker/dockerhub/Dockerfile) running with podman 2.2.1 or 3.0.1, but based on a openjdk-11-jre base layer.
>            Reporter: Fabian Reiber
>            Priority: Major
>         Attachments: nifi_websocket.png
>
>
> I have a python application which offers a websocket to apache nifi. Nifi uses the ConnectWebSocket to connect to the sever as a client. After an upgrade from 1.13.2 to 1.15.1 it does not work anymore. Respectively nifi 1.15.1 reconnects all the time to the websocket application. With version 1.13.2 nifi connects once with the websocket and everything is fine. So, I guess my default setup with podman which runs a nifi, redis and python container in one pod, does not relate with the bug, because the same setup with the same flow work with the current nifi version I am using.
> To simplify the setup and to reproduce this issue, I used this python application with [websockets|https://github.com/aaugustin/websockets] in a container:
> {code:python}
> import asyncio
> import logging
> from websockets import serve
> ws_logger = logging.getLogger('websockets')
> ws_logger.setLevel(logging.DEBUG)
> ws_logger.addHandler(logging.StreamHandler())
> async def echo(websocket, path):
>     async for message in websocket:
>         await websocket.send(message)
> async def main():
>     async with serve(echo, 'localhost', 8761):
>         await asyncio.Future()  # run forever
> asyncio.run(main())
> {code}
>  
> With nifi 1.14.0 the server logs :
> {code:java}
> server listening on [::1]:8761
> server listening on 127.0.0.1:8761
> = connection is CONNECTING
> < GET /foobar HTTP/1.1
> < Accept-Encoding: gzip
> < User-Agent: Jetty/9.4.42.v20210604
> < Upgrade: websocket
> < Connection: Upgrade
> < Sec-WebSocket-Key: izuSbowZLyZvfon4HgAzRQ==
> < Sec-WebSocket-Version: 13
> < Pragma: no-cache
> < Cache-Control: no-cache
> < Host: 127.0.0.1:8761
> > HTTP/1.1 101 Switching Protocols
> > Upgrade: websocket
> > Connection: Upgrade
> > Sec-WebSocket-Accept: Npa81PCNknQPE65lvzGnHYCzMoo=
> > Date: Mon, 20 Dec 2021 12:56:00 GMT
> > Server: Python/3.7 websockets/10.1
> connection open
> = connection is OPEN
> % sending keepalive ping
> > PING a3 38 b7 85 [binary, 4 bytes]
> < PONG a3 38 b7 85 [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING 4a 5e cd 02 [binary, 4 bytes]
> < PONG 4a 5e cd 02 [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING eb 90 17 a2 [binary, 4 bytes]
> < PONG eb 90 17 a2 [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING 97 b3 1d 6f [binary, 4 bytes]
> < PONG 97 b3 1d 6f [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING 29 86 3c dc [binary, 4 bytes]
> < PONG 29 86 3c dc [binary, 4 bytes]
> % received keepalive pong
> {code}
> While the client logs:
> {code:java}
> INFO [Timer-Driven Process Thread-5] o.a.n.w.jetty.JettyWebSocketClient JettyWebSocketClient[id=a7bd40ce-3fab-3c7e-c391-a66122576fc7] Connecting to : ws://127.0.0.1:8761/foobar
> INFO [Timer-Driven Process Thread-5] o.a.n.w.jetty.JettyWebSocketClient JettyWebSocketClient[id=a7bd40ce-3fab-3c7e-c391-a66122576fc7] Connected, session=WebSocketSession[websocket=JettyListenerEventDriver[org.apache.nifi.websocket.jetty.RoutingWebSocketListener],behavior=CLIENT,connection=WebSocketClientConnection@aad940f::SocketChannelEndPoint@4d2b432c{l=/127.0.0.1:40962,r=/127.0.0.1:8761,OPEN,fill=FI,flush=-,to=2/300000}{io=1/1,kio=1,kro=1}->WebSocketClientConnection@aad940f[s=ConnectionState@26d0eddf[OPENED],f=Flusher@6b69d9be[IDLE][queueSize=0,aggregateSize=-1,terminated=null],g=Generator[CLIENT,validating],p=Parser@453aa6fb[ExtensionStack,s=START,c=0,len=0,f=null]],remote=WebSocketRemoteEndpoint@529faf53[batching=true],incoming=JettyListenerEventDriver[org.apache.nifi.websocket.jetty.RoutingWebSocketListener],outgoing=ExtensionStack[queueSize=0,extensions=[],incoming=org.eclipse.jetty.websocket.common.WebSocketSession,outgoing=org.eclipse.jetty.websocket.client.io.WebSocketClientConnection]]
> {code}
> With nifi 1.15.1 the server logs:
> {code:java}
> server listening on [::1]:8761
> server listening on 127.0.0.1:8761
> = connection is CONNECTING
> < GET /foorbar HTTP/1.1
> < Accept-Encoding: gzip
> < User-Agent: Jetty/9.4.44.v20210927
> < Upgrade: websocket
> < Connection: Upgrade
> < Sec-WebSocket-Key: mcwkinuZNb2h57k3y4b/ZQ==
> < Sec-WebSocket-Version: 13
> < Pragma: no-cache
> < Cache-Control: no-cache
> < Host: 127.0.0.1:8761
> > HTTP/1.1 101 Switching Protocols
> > Upgrade: websocket
> > Connection: Upgrade
> > Sec-WebSocket-Accept: Qw6zk+7Zxoizai5KDPuykDSvhME=
> > Date: Mon, 20 Dec 2021 12:42:24 GMT
> > Server: Python/3.7 websockets/10.1
> connection open
> = connection is OPEN
> = connection is CONNECTING
> < GET /foorbar HTTP/1.1
> < Accept-Encoding: gzip
> < User-Agent: Jetty/9.4.44.v20210927
> < Upgrade: websocket
> < Connection: Upgrade
> < Sec-WebSocket-Key: qVf9OYjG0ZbKDUi+EH5vmQ==
> < Sec-WebSocket-Version: 13
> < Pragma: no-cache
> < Cache-Control: no-cache
> < Host: 127.0.0.1:8761
> > HTTP/1.1 101 Switching Protocols
> > Upgrade: websocket
> > Connection: Upgrade
> > Sec-WebSocket-Accept: hUTCxdijd53SZ0cDukb7XL9G9Wg=
> > Date: Mon, 20 Dec 2021 12:42:26 GMT
> > Server: Python/3.7 websockets/10.1
> connection open
> = connection is OPEN
> = connection is CONNECTING
> < GET /foorbar HTTP/1.1
> < Accept-Encoding: gzip
> < User-Agent: Jetty/9.4.44.v20210927
> < Upgrade: websocket
> < Connection: Upgrade
> < Sec-WebSocket-Key: uVz1Tz7Fv0t/Cgk2ihr92Q==
> < Sec-WebSocket-Version: 13
> < Pragma: no-cache
> < Cache-Control: no-cache
> < Host: 127.0.0.1:8761
> > HTTP/1.1 101 Switching Protocols
> > Upgrade: websocket
> > Connection: Upgrade
> > Sec-WebSocket-Accept: NkC5cNqhnoR3DmQx8WtVN89qUFk=
> > Date: Mon, 20 Dec 2021 12:42:27 GMT
> > Server: Python/3.7 websockets/10.1
> connection open
> ...........cut the logging after a lot of reconnects here.......
> = connection is OPEN
> % sending keepalive ping
> > PING a1 f7 b4 3b [binary, 4 bytes]
> < PONG a1 f7 b4 3b [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING a6 2c ad 8f [binary, 4 bytes]
> < PONG a6 2c ad 8f [binary, 4 bytes]
> % received keepalive pong
> = connection is CONNECTING
> < GET /foorbar HTTP/1.1
> < Accept-Encoding: gzip
> < User-Agent: Jetty/9.4.44.v20210927
> < Upgrade: websocket
> < Connection: Upgrade
> < Sec-WebSocket-Key: pTB3w+e0IPk4JyMntKYyxg==
> < Sec-WebSocket-Version: 13
> < Pragma: no-cache
> < Cache-Control: no-cache
> < Host: 127.0.0.1:8761
> > HTTP/1.1 101 Switching Protocols
> > Upgrade: websocket
> > Connection: Upgrade
> > Sec-WebSocket-Accept: hOg8hW+ZLZI7uEUYm6e/vWAzeL0=
> > Date: Mon, 20 Dec 2021 12:43:05 GMT
> > Server: Python/3.7 websockets/10.1
> connection open
> = connection is OPEN
> % sending keepalive ping
> > PING f3 8d 6f 88 [binary, 4 bytes]
> < PONG f3 8d 6f 88 [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING a8 2b 80 f8 [binary, 4 bytes]
> < PONG a8 2b 80 f8 [binary, 4 bytes]
> % received keepalive pong
> = connection is CONNECTING
> < GET /foorbar HTTP/1.1
> < Accept-Encoding: gzip
> < User-Agent: Jetty/9.4.44.v20210927
> < Upgrade: websocket
> < Connection: Upgrade
> < Sec-WebSocket-Key: gELViab0ezRP9exTZH8xtg==
> < Sec-WebSocket-Version: 13
> < Pragma: no-cache
> < Cache-Control: no-cache
> < Host: 127.0.0.1:8761
> > HTTP/1.1 101 Switching Protocols
> > Upgrade: websocket
> > Connection: Upgrade
> > Sec-WebSocket-Accept: M2Lj+XbOgKD0CEcRGHkrFAJkZUo=
> > Date: Mon, 20 Dec 2021 12:43:06 GMT
> > Server: Python/3.7 websockets/10.1
> connection open
> = connection is OPEN
> % sending keepalive ping
> > PING f4 39 fc 6b [binary, 4 bytes]
> < PONG f4 39 fc 6b [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING a0 7f dc d0 [binary, 4 bytes]
> < PONG a0 7f dc d0 [binary, 4 bytes]
> % received keepalive pong
> = connection is CONNECTING
> < GET /foorbar HTTP/1.1
> < Accept-Encoding: gzip
> < User-Agent: Jetty/9.4.44.v20210927
> < Upgrade: websocket
> < Connection: Upgrade
> < Sec-WebSocket-Key: CJFkhLuA8f0CsbESbW/KZA==
> < Sec-WebSocket-Version: 13
> < Pragma: no-cache
> < Cache-Control: no-cache
> < Host: 127.0.0.1:8761
> > HTTP/1.1 101 Switching Protocols
> > Upgrade: websocket
> > Connection: Upgrade
> > Sec-WebSocket-Accept: KPmtqFaiQ1ssBxyqJ7QwzcLVljc=
> > Date: Mon, 20 Dec 2021 12:43:07 GMT
> > Server: Python/3.7 websockets/10.1
> connection open
> = connection is OPEN
> % sending keepalive ping
> > PING 9c a8 ea 12 [binary, 4 bytes]
> < PONG 9c a8 ea 12 [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING 3a c1 ed 1d [binary, 4 bytes]
> < PONG 3a c1 ed 1d [binary, 4 bytes]
> % received keepalive pong
> = connection is CONNECTING
> < GET /foorbar HTTP/1.1
> < Accept-Encoding: gzip
> < User-Agent: Jetty/9.4.44.v20210927
> < Upgrade: websocket
> < Connection: Upgrade
> < Sec-WebSocket-Key: 0Hj60r2pde6B3K7DB6+qoQ==
> < Sec-WebSocket-Version: 13
> < Pragma: no-cache
> < Cache-Control: no-cache
> < Host: 127.0.0.1:8761
> > HTTP/1.1 101 Switching Protocols
> > Upgrade: websocket
> > Connection: Upgrade
> > Sec-WebSocket-Accept: UCFVwstv90hE2fx7hMGCMCHXod0=
> > Date: Mon, 20 Dec 2021 12:43:08 GMT
> > Server: Python/3.7 websockets/10.1
> connection open
> = connection is OPEN
> % sending keepalive ping
> > PING d0 24 05 03 [binary, 4 bytes]
> < PONG d0 24 05 03 [binary, 4 bytes]
> % received keepalive pong
> % sending keepalive ping
> > PING 2d 41 77 d9 [binary, 4 bytes]
> < PONG 2d 41 77 d9 [binary, 4 bytes]
> % received keepalive pong
> = connection is CONNECTING
> {code}
> While the client logs:
> {code:java}
> DEBUG [pool-27-thread-1] o.a.n.w.jetty.JettyWebSocketClient JettyWebSocketClient[id=a1c893af-33f5-3a3c-cfb9-5b0b6529af91] Session maintenance completed. activeSessions={foorbar/1=dto.SessionInfo@51429c2f}
> INFO [Timer-Driven Process Thread-8] o.a.n.w.jetty.JettyWebSocketClient JettyWebSocketClient[id=a1c893af-33f5-3a3c-cfb9-5b0b6529af91] Connecting to : ws://127.0.0.1:8761/foorbar
> INFO [Timer-Driven Process Thread-8] o.a.n.w.jetty.JettyWebSocketClient JettyWebSocketClient[id=a1c893af-33f5-3a3c-cfb9-5b0b6529af91] Connected, session=WebSocketSession[websocket=JettyListenerEventDriver[org.apache.nifi.websocket.jetty.RoutingWebSocketListener],behavior=CLIENT,connection=WebSocketClientConnection@3169490b::SocketChannelEndPoint@74f13d{l=/127.0.0.1:39762,r=/127.0.0.1:8761,OPEN,fill=FI,flush=-,to=0/300000}{io=1/1,kio=1,kro=1}->WebSocketClientConnection@3169490b[s=ConnectionState@72ecb390[OPENED],f=Flusher@42ccfecb[IDLE][queueSize=0,aggregateSize=-1,terminated=null],g=Generator[CLIENT,validating],p=Parser@3996c688[ExtensionStack,s=START,c=0,len=0,f=null]],remote=WebSocketRemoteEndpoint@1ecc259e[batching=true],incoming=JettyListenerEventDriver[org.apache.nifi.websocket.jetty.RoutingWebSocketListener],outgoing=ExtensionStack[queueSize=0,extensions=[],incoming=org.eclipse.jetty.websocket.common.WebSocketSession,outgoing=org.eclipse.jetty.websocket.client.io.WebSocketClientConnection]]
> {code}
> The logging of the reconnections does not stop with 1.15.1 and each second the websocket server logs it. I reproduced it with [websocat|https://github.com/vi/websocat] as well:
> {code:java}
>  websocat -s 8761  -vv
> {code}
> I found [this nifi issue|https://issues.apache.org/jira/browse/NIFI-8639] which was implemented in 1.15.0 and I guess this feature breaks the websocket connection now. I can use nifi in version 1.15.1, without the never ending reconnections, if I change the version of the nar file "nifi-websocket-processors-nar" from version 1.15.1 to 1.14.0.
> If you need more information or can't reproduce it, just ping me :)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)