You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by ijokarumawak <gi...@git.apache.org> on 2017/03/16 08:18:52 UTC

[GitHub] nifi pull request #1597: NIFI-3609: ConnectWebSocket auto session recovery

GitHub user ijokarumawak opened a pull request:

    https://github.com/apache/nifi/pull/1597

    NIFI-3609: ConnectWebSocket auto session recovery

    Before this fix, ConnectWebSocket has to be restarted manually if its target WebSocket server restarted, or session expiration if there's no communication more than certain period of time (looks 3 min). This PR adds automatic session maintenance logic so that ConnectWebSocket keeps its session alive as long as it's running. 
    
    - Removed unused disconnect method from WebSocketService interface.
    - Added session maintenance background thread at JettyWebSocketClient
      which reconnects sessions those are still referred by ConnectWebSocket
      processor but no longer active.
    - Added Session Maintenance Interval property to JettyWebSocketClient.
    - Allowed specifying existing session id so that it can be recovered
      transparently.
    - Moved test classes to appropriate package.
    - Added test cases that verify the same session id can be used after
      WebSocket server restarts.
    
    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [x] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ijokarumawak/nifi nifi-3609

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1597.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1597
    
----
commit 0da822fb52c698d04726b9b6bd060480dc21ed1b
Author: Koji Kawamura <ij...@apache.org>
Date:   2017-03-16T08:10:38Z

    NIFI-3609: ConnectWebSocket auto session recovery
    
    - Removed unused disconnect method from WebSocketService interface.
    - Added session maintenance background thread at JettyWebSocketClient
      which reconnects sessions those are still referred by ConnectWebSocket
      processor but no longer active.
    - Added Session Maintenance Interval property to JettyWebSocketClient.
    - Allowed specifying existing session id so that it can be recovered
      transparently.
    - Moved test classes to appropriate package.
    - Added test cases that verify the same session id can be used after
      WebSocket server restarts.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by jdye64 <gi...@git.apache.org>.
Github user jdye64 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114116276
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java ---
    @@ -81,6 +87,16 @@
                 .defaultValue("3 sec")
                 .build();
     
    +    public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("session-maintenance-interval")
    +            .displayName("Session Maintenance Interval")
    +            .description("The interval between session maintenance activities.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("10 sec")
    --- End diff --
    
    Since this is invoking maintainSessions() and that is effectively reconnecting sessions would it make sense to make this default value less than the default value of CONNECTION_TIMEOUT just so by default this would prevent connections from timing out at all? This would help in the case that the client becomes inactive for longer than the default CONNECTION_TIMEOUT and help ensure messages are not missed being delivered to the client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/1597
  
    @jdye64 Thanks for your feedback, much appreciated! I've added new commit and comments to address your concerns. Please take another look when you have time. Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114280064
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java ---
    @@ -81,6 +87,16 @@
                 .defaultValue("3 sec")
                 .build();
     
    +    public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("session-maintenance-interval")
    +            .displayName("Session Maintenance Interval")
    +            .description("The interval between session maintenance activities.")
    --- End diff --
    
    Good point, I added more description what this maintenance does. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114280792
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java ---
    @@ -81,6 +87,16 @@
                 .defaultValue("3 sec")
                 .build();
     
    +    public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("session-maintenance-interval")
    +            .displayName("Session Maintenance Interval")
    +            .description("The interval between session maintenance activities.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("10 sec")
    --- End diff --
    
    The CONNECTION_TIMEOUT is a timeout for the initial connection attempt and idle connection timeout is different (IIRC 5 min by default and not configurable from controller ATM, probably we should it configurable though). The maintenance does not take any effect before a connection is established.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by jdye64 <gi...@git.apache.org>.
Github user jdye64 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114116075
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java ---
    @@ -81,6 +87,16 @@
                 .defaultValue("3 sec")
                 .build();
     
    +    public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("session-maintenance-interval")
    +            .displayName("Session Maintenance Interval")
    +            .description("The interval between session maintenance activities.")
    --- End diff --
    
    Could we add some more details around what this is doing? I get its the interval for the maintenance thread but I'm worried that end users might not really understand what that is? Maybe a brief sentence about what this "maintenance" interval is actually doing and why it is necessary so they understand how to use it a little better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by jdye64 <gi...@git.apache.org>.
Github user jdye64 commented on the issue:

    https://github.com/apache/nifi/pull/1597
  
    reviewing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/1597


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by jdye64 <gi...@git.apache.org>.
Github user jdye64 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114116475
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java ---
    @@ -135,27 +177,81 @@ public void stopClient() throws Exception {
     
         @Override
         public void connect(final String clientId) throws IOException {
    +        connect(clientId, null);
    +    }
    +
    +    private void connect(final String clientId, String sessionId) throws IOException {
    +
    +        connectionLock.lock();
     
    -        final WebSocketMessageRouter router;
             try {
    -            router = routers.getRouterOrFail(clientId);
    -        } catch (WebSocketConfigurationException e) {
    -            throw new IllegalStateException("Failed to get router due to: "  + e, e);
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = routers.getRouterOrFail(clientId);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due to: "  + e, e);
    +            }
    +            final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
    +            listener.setSessionId(sessionId);
    +
    +            final ClientUpgradeRequest request = new ClientUpgradeRequest();
    +            final Future<Session> connect = client.connect(listener, webSocketUri, request);
    +            getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
    +
    +            final Session session;
    +            try {
    +                session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
    +            } catch (Exception e) {
    +                throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
    +            }
    +            getLogger().info("Connected, session={}", new Object[]{session});
    +            activeSessions.put(clientId, listener.getSessionId());
    +
    +        } finally {
    +            connectionLock.unlock();
             }
    -        final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
     
    -        final ClientUpgradeRequest request = new ClientUpgradeRequest();
    -        final Future<Session> connect = client.connect(listener, webSocketUri, request);
    -        getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
    +    }
    +
    +    private Map<String, String> activeSessions = new ConcurrentHashMap<>();
     
    -        final Session session;
    +    void maintainSessions() throws Exception {
    +        if (client == null) {
    +            return;
    +        }
    +
    +        connectionLock.lock();
    +
    +        final ComponentLog logger = getLogger();
             try {
    -            session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
    -        } catch (Exception e) {
    -            throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
    +            // Loop through existing sessions and reconnect.
    +            for (String clientId : activeSessions.keySet()) {
    +                final WebSocketMessageRouter router;
    +                try {
    +                    router = routers.getRouterOrFail(clientId);
    +                } catch (final WebSocketConfigurationException e) {
    +                    if (logger.isDebugEnabled()) {
    +                        logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[]{clientId});
    +                    }
    +                    activeSessions.remove(clientId);
    +                    continue;
    +                }
    +
    +                final String sessionId = activeSessions.get(clientId);
    +                // If this session is stil alive, do nothing.
    +                if (!router.containsSession(sessionId)) {
    +                    // This session is no longer active, reconnect it.
    +                    // If it fails, the sessionId will remain in activeSessions, and retries later.
    --- End diff --
    
    More a question but should we limit the number of times this is attempted before the sessionId is just removed? I could see an argument for either way but curious your thought here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1597: NIFI-3609: ConnectWebSocket auto session recovery

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114281896
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java ---
    @@ -135,27 +177,81 @@ public void stopClient() throws Exception {
     
         @Override
         public void connect(final String clientId) throws IOException {
    +        connect(clientId, null);
    +    }
    +
    +    private void connect(final String clientId, String sessionId) throws IOException {
    +
    +        connectionLock.lock();
     
    -        final WebSocketMessageRouter router;
             try {
    -            router = routers.getRouterOrFail(clientId);
    -        } catch (WebSocketConfigurationException e) {
    -            throw new IllegalStateException("Failed to get router due to: "  + e, e);
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = routers.getRouterOrFail(clientId);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due to: "  + e, e);
    +            }
    +            final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
    +            listener.setSessionId(sessionId);
    +
    +            final ClientUpgradeRequest request = new ClientUpgradeRequest();
    +            final Future<Session> connect = client.connect(listener, webSocketUri, request);
    +            getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
    +
    +            final Session session;
    +            try {
    +                session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
    +            } catch (Exception e) {
    +                throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
    +            }
    +            getLogger().info("Connected, session={}", new Object[]{session});
    +            activeSessions.put(clientId, listener.getSessionId());
    +
    +        } finally {
    +            connectionLock.unlock();
             }
    -        final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
     
    -        final ClientUpgradeRequest request = new ClientUpgradeRequest();
    -        final Future<Session> connect = client.connect(listener, webSocketUri, request);
    -        getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
    +    }
    +
    +    private Map<String, String> activeSessions = new ConcurrentHashMap<>();
     
    -        final Session session;
    +    void maintainSessions() throws Exception {
    +        if (client == null) {
    +            return;
    +        }
    +
    +        connectionLock.lock();
    +
    +        final ComponentLog logger = getLogger();
             try {
    -            session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
    -        } catch (Exception e) {
    -            throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
    +            // Loop through existing sessions and reconnect.
    +            for (String clientId : activeSessions.keySet()) {
    +                final WebSocketMessageRouter router;
    +                try {
    +                    router = routers.getRouterOrFail(clientId);
    +                } catch (final WebSocketConfigurationException e) {
    +                    if (logger.isDebugEnabled()) {
    +                        logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[]{clientId});
    +                    }
    +                    activeSessions.remove(clientId);
    +                    continue;
    +                }
    +
    +                final String sessionId = activeSessions.get(clientId);
    +                // If this session is stil alive, do nothing.
    +                if (!router.containsSession(sessionId)) {
    +                    // This session is no longer active, reconnect it.
    +                    // If it fails, the sessionId will remain in activeSessions, and retries later.
    --- End diff --
    
    Good point. I think we should keep it running until user stops the processor or the controller service. The goal is making WebSocket connection successful. We don't know how long it takes for a connection to be recovered but as long as the processor running, we can assume user would like it to be connected again.
    
    During I was thinking and testing above scenarios, I found that ConnectWebSocket can not connect to a WebSocket server if the server is not running, and ConnectWebSocket stays 'STARTED' state but it doesn't retry connecting. I added de-registering call when initial connecting attempt fails so that it can retry connecting when it is scheduled next time.
    
    Once it successfully established a connection and got a session id, the maintenance activity does its job.
    
    Does this sound reasonable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---