You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by ijokarumawak <gi...@git.apache.org> on 2016/11/04 15:05:34 UTC

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

GitHub user ijokarumawak opened a pull request:

    https://github.com/apache/nifi/pull/1184

    NIFI-1002: Added WebSocket support.

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [x] Have you written or updated unit tests to verify your changes?
    - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ijokarumawak/nifi nifi-1002

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1184
    
----
commit 1c3f2baa015d53326410cb0b5c5fa1973222aa30
Author: Koji Kawamura <ij...@apache.org>
Date:   2016-10-18T01:43:42Z

    NIFI-1002: Added WebSocket support.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1184#discussion_r87268737
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketProcessor.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.websocket;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.websocket.WebSocketConfigurationException;
    +import org.apache.nifi.websocket.WebSocketService;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public abstract class AbstractWebSocketProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final String ATTR_WS_CS_ID = "websocket.controller.service.id";
    +    public static final String ATTR_WS_SESSION_ID = "websocket.session.id";
    +    public static final String ATTR_WS_ENDPOINT_ID = "websocket.endpoint.id";
    +    public static final String ATTR_WS_FAILURE_DETAIL = "websocket.failure.detail";
    +    public static final String ATTR_WS_MESSAGE_TYPE = "websocket.message.type";
    +    public static final String ATTR_WS_LOCAL_ADDRESS = "websocket.local.address";
    +    public static final String ATTR_WS_REMOTE_ADDRESS = "websocket.remote.address";
    +
    +    static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        return descriptors;
    --- End diff --
    
    I am not sure I understand this code
    1. For each call you create a new List
    2. It's always empty
    Could you please clarify?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1184#discussion_r87958669
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.websocket.jetty;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnDisabled;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.annotation.lifecycle.OnShutdown;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.nar.NarCloseable;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.nifi.websocket.WebSocketConfigurationException;
    +import org.apache.nifi.websocket.WebSocketMessageRouter;
    +import org.apache.nifi.websocket.WebSocketServerService;
    +import org.eclipse.jetty.server.Connector;
    +import org.eclipse.jetty.server.Handler;
    +import org.eclipse.jetty.server.HttpConfiguration;
    +import org.eclipse.jetty.server.HttpConnectionFactory;
    +import org.eclipse.jetty.server.SecureRequestCustomizer;
    +import org.eclipse.jetty.server.Server;
    +import org.eclipse.jetty.server.ServerConnector;
    +import org.eclipse.jetty.server.SslConnectionFactory;
    +import org.eclipse.jetty.server.handler.ContextHandlerCollection;
    +import org.eclipse.jetty.servlet.ServletContextHandler;
    +import org.eclipse.jetty.servlet.ServletHandler;
    +import org.eclipse.jetty.util.ssl.SslContextFactory;
    +import org.eclipse.jetty.websocket.api.Session;
    +import org.eclipse.jetty.websocket.api.WebSocketPolicy;
    +import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
    +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
    +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
    +import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
    +import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
    +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
    +
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +@Tags({"WebSocket", "Jetty", "server"})
    +@CapabilityDescription("Implementation of WebSocketServerService." +
    +        " This service uses Jetty WebSocket server module to provide" +
    +        " WebSocket session management throughout the application.")
    +public class JettyWebSocketServer extends AbstractJettyWebSocketService implements WebSocketServerService {
    +
    +    /**
    +     * A global map to refer a controller service instance by requested port number.
    +     */
    +    private static final Map<Integer, JettyWebSocketServer> portToControllerService = new ConcurrentHashMap<>();
    +
    +    // Allowable values for client auth
    +    public static final AllowableValue CLIENT_NONE = new AllowableValue("no", "No Authentication",
    +            "Processor will not authenticate clients. Anyone can communicate with this Processor anonymously");
    +    public static final AllowableValue CLIENT_WANT = new AllowableValue("want", "Want Authentication",
    +            "Processor will try to verify the client but if unable to verify will allow the client to communicate anonymously");
    +    public static final AllowableValue CLIENT_NEED = new AllowableValue("need", "Need Authentication",
    +            "Processor will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore"
    +                    + "specified in the SSL Context Service");
    +
    +    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
    +            .name("client-authentication")
    +            .displayName("Client Authentication")
    +            .description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> "
    +                    + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
    +            .required(true)
    +            .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
    +            .defaultValue(CLIENT_NONE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor LISTEN_PORT = new PropertyDescriptor.Builder()
    +            .name("listen-port")
    +            .displayName("Listen Port")
    +            .description("The port number on which this WebSocketServer listens to.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.addAll(getAbstractPropertyDescriptors());
    +        props.add(LISTEN_PORT);
    +        props.add(SSL_CONTEXT);
    +        props.add(CLIENT_AUTH);
    +
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    private WebSocketPolicy configuredPolicy;
    +    private Server server;
    +    private Integer listenPort;
    +    private ServletHandler servletHandler;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +
    +    public static class JettyWebSocketServlet extends WebSocketServlet implements WebSocketCreator {
    +        @Override
    +        public void configure(WebSocketServletFactory webSocketServletFactory) {
    +            webSocketServletFactory.setCreator(this);
    +        }
    +
    +        @Override
    +        public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
    +            final URI requestURI = servletUpgradeRequest.getRequestURI();
    +            final int port = requestURI.getPort();
    +            final JettyWebSocketServer service = portToControllerService.get(port);
    +
    +            if (service == null) {
    +                throw new RuntimeException("No controller service is bound with port: " + port);
    +            }
    +
    +            final String path = requestURI.getPath();
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = service.routers.getRouterOrFail(path);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due to: "  + e, e);
    +            }
    +
    +            final RoutingWebSocketListener listener = new RoutingWebSocketListener(router) {
    +                @Override
    +                public void onWebSocketConnect(Session session) {
    +                    final WebSocketPolicy currentPolicy = session.getPolicy();
    +                    currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize());
    +                    currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize());
    +                    currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize());
    +                    super.onWebSocketConnect(session);
    +                }
    +            };
    +
    +            return listener;
    +        }
    +    }
    +
    +    @OnEnabled
    +    @Override
    +    public void startServer(final ConfigurationContext context) throws Exception {
    +
    +        if (server != null && server.isRunning()) {
    +            getLogger().info("A WebSocket server is already running. {}", new Object[]{server});
    +            return;
    +        }
    +
    +        configuredPolicy = WebSocketPolicy.newServerPolicy();
    +        configurePolicy(context, configuredPolicy);
    +
    +        server = new Server();
    +
    +        final ContextHandlerCollection handlerCollection = new ContextHandlerCollection();
    +
    +        final ServletContextHandler contextHandler = new ServletContextHandler();
    +        servletHandler = new ServletHandler();
    +        contextHandler.insertHandler(servletHandler);
    +
    +        handlerCollection.setHandlers(new Handler[]{contextHandler});
    +
    +        server.setHandler(handlerCollection);
    +
    +        listenPort = context.getProperty(LISTEN_PORT).asInteger();
    +        final SslContextFactory sslContextFactory = createSslFactory(context);
    +
    +        final ServerConnector serverConnector = createConnector(sslContextFactory, listenPort);
    +
    +        server.setConnectors(new Connector[] {serverConnector});
    +
    +        servletHandler.addServletWithMapping(JettyWebSocketServlet.class, "/*");
    +
    +        // Need to specify classloader, otherwise since the callstack doesn't have any nifi specific class, so it can't use nar.
    +        try (NarCloseable closeable = NarCloseable.withComponentNarLoader(WebSocketServerFactory.class)) {
    --- End diff --
    
    Thanks Oleg, I removed the use of NarCloseable and confirmed that JettyWebSocketServer works without this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1184: NIFI-1002: Added WebSocket support.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/1184
  
    @olegz Thanks again for the great review! I rebased the PR and done additional refactoring based on your feedback. Please let me know if you prefer it to be squashed for further review process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1184#discussion_r87873512
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.websocket.jetty;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnDisabled;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.annotation.lifecycle.OnShutdown;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.nar.NarCloseable;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.nifi.websocket.WebSocketConfigurationException;
    +import org.apache.nifi.websocket.WebSocketMessageRouter;
    +import org.apache.nifi.websocket.WebSocketServerService;
    +import org.eclipse.jetty.server.Connector;
    +import org.eclipse.jetty.server.Handler;
    +import org.eclipse.jetty.server.HttpConfiguration;
    +import org.eclipse.jetty.server.HttpConnectionFactory;
    +import org.eclipse.jetty.server.SecureRequestCustomizer;
    +import org.eclipse.jetty.server.Server;
    +import org.eclipse.jetty.server.ServerConnector;
    +import org.eclipse.jetty.server.SslConnectionFactory;
    +import org.eclipse.jetty.server.handler.ContextHandlerCollection;
    +import org.eclipse.jetty.servlet.ServletContextHandler;
    +import org.eclipse.jetty.servlet.ServletHandler;
    +import org.eclipse.jetty.util.ssl.SslContextFactory;
    +import org.eclipse.jetty.websocket.api.Session;
    +import org.eclipse.jetty.websocket.api.WebSocketPolicy;
    +import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
    +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
    +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
    +import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
    +import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
    +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
    +
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +@Tags({"WebSocket", "Jetty", "server"})
    +@CapabilityDescription("Implementation of WebSocketServerService." +
    +        " This service uses Jetty WebSocket server module to provide" +
    +        " WebSocket session management throughout the application.")
    +public class JettyWebSocketServer extends AbstractJettyWebSocketService implements WebSocketServerService {
    +
    +    /**
    +     * A global map to refer a controller service instance by requested port number.
    +     */
    +    private static final Map<Integer, JettyWebSocketServer> portToControllerService = new ConcurrentHashMap<>();
    +
    +    // Allowable values for client auth
    +    public static final AllowableValue CLIENT_NONE = new AllowableValue("no", "No Authentication",
    +            "Processor will not authenticate clients. Anyone can communicate with this Processor anonymously");
    +    public static final AllowableValue CLIENT_WANT = new AllowableValue("want", "Want Authentication",
    +            "Processor will try to verify the client but if unable to verify will allow the client to communicate anonymously");
    +    public static final AllowableValue CLIENT_NEED = new AllowableValue("need", "Need Authentication",
    +            "Processor will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore"
    +                    + "specified in the SSL Context Service");
    +
    +    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
    +            .name("client-authentication")
    +            .displayName("Client Authentication")
    +            .description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> "
    +                    + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
    +            .required(true)
    +            .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
    +            .defaultValue(CLIENT_NONE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor LISTEN_PORT = new PropertyDescriptor.Builder()
    +            .name("listen-port")
    +            .displayName("Listen Port")
    +            .description("The port number on which this WebSocketServer listens to.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.addAll(getAbstractPropertyDescriptors());
    +        props.add(LISTEN_PORT);
    +        props.add(SSL_CONTEXT);
    +        props.add(CLIENT_AUTH);
    +
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    private WebSocketPolicy configuredPolicy;
    +    private Server server;
    +    private Integer listenPort;
    +    private ServletHandler servletHandler;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +
    +    public static class JettyWebSocketServlet extends WebSocketServlet implements WebSocketCreator {
    +        @Override
    +        public void configure(WebSocketServletFactory webSocketServletFactory) {
    +            webSocketServletFactory.setCreator(this);
    +        }
    +
    +        @Override
    +        public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
    +            final URI requestURI = servletUpgradeRequest.getRequestURI();
    +            final int port = requestURI.getPort();
    +            final JettyWebSocketServer service = portToControllerService.get(port);
    +
    +            if (service == null) {
    +                throw new RuntimeException("No controller service is bound with port: " + port);
    +            }
    +
    +            final String path = requestURI.getPath();
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = service.routers.getRouterOrFail(path);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due to: "  + e, e);
    +            }
    +
    +            final RoutingWebSocketListener listener = new RoutingWebSocketListener(router) {
    +                @Override
    +                public void onWebSocketConnect(Session session) {
    +                    final WebSocketPolicy currentPolicy = session.getPolicy();
    +                    currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize());
    +                    currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize());
    +                    currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize());
    +                    super.onWebSocketConnect(session);
    +                }
    +            };
    +
    +            return listener;
    +        }
    +    }
    +
    +    @OnEnabled
    +    @Override
    +    public void startServer(final ConfigurationContext context) throws Exception {
    +
    +        if (server != null && server.isRunning()) {
    +            getLogger().info("A WebSocket server is already running. {}", new Object[]{server});
    +            return;
    +        }
    +
    +        configuredPolicy = WebSocketPolicy.newServerPolicy();
    +        configurePolicy(context, configuredPolicy);
    +
    +        server = new Server();
    +
    +        final ContextHandlerCollection handlerCollection = new ContextHandlerCollection();
    +
    +        final ServletContextHandler contextHandler = new ServletContextHandler();
    +        servletHandler = new ServletHandler();
    +        contextHandler.insertHandler(servletHandler);
    +
    +        handlerCollection.setHandlers(new Handler[]{contextHandler});
    +
    +        server.setHandler(handlerCollection);
    +
    +        listenPort = context.getProperty(LISTEN_PORT).asInteger();
    +        final SslContextFactory sslContextFactory = createSslFactory(context);
    +
    +        final ServerConnector serverConnector = createConnector(sslContextFactory, listenPort);
    +
    +        server.setConnectors(new Connector[] {serverConnector});
    +
    +        servletHandler.addServletWithMapping(JettyWebSocketServlet.class, "/*");
    +
    +        // Need to specify classloader, otherwise since the callstack doesn't have any nifi specific class, so it can't use nar.
    +        try (NarCloseable closeable = NarCloseable.withComponentNarLoader(WebSocketServerFactory.class)) {
    --- End diff --
    
    This no longer compiles after recent work on dynamic class loading by @bbende 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1184: NIFI-1002: Added WebSocket support.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/1184
  
    @olegz Thanks for taking time to review! I added a commit to address your feedback. Also, I've added `displayName` to the property descriptors those were added in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1184: NIFI-1002: Added WebSocket support.

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/1184
  
    @ijokarumawak at this point I am going to say Great work! I am sure in month to come there will be things ti improve as people start using it , but LGTM for now. Will merge shortly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1184: NIFI-1002: Added WebSocket support.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/1184
  
    Hello NiFi developers,
    
    This PR contains a new set of WebSocket support components.
    
    At least there're two WebSocket processors available on GitHub already, [nifi-websocket](https://github.com/xmlking/nifi-websocket) by @xmlking , and [nifi-websockets-bundle](https://github.com/acesir/nifi-websockets-bundle) by @acesir. I appreciate these projects providing WebSocket support for NiFi, and wanted to merge the capability to Apache NiFi code base. I referred these existing projects, and add few functionalities and Unit Test codes.
    
    Features:
    - As a WebSocket client to connect a remote WebSocket server
    - As a WebSocket server to receive connection from remote WebSocket clients
    - Receive and send text messages as well as binary messages
    - Secure communication (using SslContextService) is supported
    
    It may look more complex, so for the description purpose, I wrote a blog post about how it works, and how it's designed. Please check this post as well, [NiFi WebSocket Support](http://ijokarumawak.github.io/nifi/2016/11/04/nifi-websocket/).
    
    Any comment or advice would be appreciated, thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1184#discussion_r87269117
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketProcessor.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.websocket;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.websocket.WebSocketConfigurationException;
    +import org.apache.nifi.websocket.WebSocketService;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public abstract class AbstractWebSocketProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final String ATTR_WS_CS_ID = "websocket.controller.service.id";
    +    public static final String ATTR_WS_SESSION_ID = "websocket.session.id";
    +    public static final String ATTR_WS_ENDPOINT_ID = "websocket.endpoint.id";
    +    public static final String ATTR_WS_FAILURE_DETAIL = "websocket.failure.detail";
    +    public static final String ATTR_WS_MESSAGE_TYPE = "websocket.message.type";
    +    public static final String ATTR_WS_LOCAL_ADDRESS = "websocket.local.address";
    +    public static final String ATTR_WS_REMOTE_ADDRESS = "websocket.remote.address";
    +
    +    static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        return descriptors;
    +    }
    +
    +    protected ComponentLog logger;
    +    protected ProcessSessionFactory processSessionFactory;
    --- End diff --
    
    Since there is a potential for the above variables to be accessed by different thread, there are thread visibility concerns. Consider making them _volatile_.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1184#discussion_r87267815
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketProcessor.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.websocket;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.websocket.WebSocketConfigurationException;
    +import org.apache.nifi.websocket.WebSocketService;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public abstract class AbstractWebSocketProcessor extends AbstractSessionFactoryProcessor {
    --- End diff --
    
    Is there a reason why this class extends from _ AbstractSessionFactoryProcessor_ and not _AbstractProcessor_? Just trying to figure out the need for _ onTrigger()_ implementation that is identical to the one in __AbstractProcessor_.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1184: NIFI-1002: Added WebSocket support.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/1184
  
    @olegz Thank you very much! I hope it will be helpful for users and willing to improve it over time!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1184#discussion_r87377445
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketProcessor.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.websocket;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.websocket.WebSocketConfigurationException;
    +import org.apache.nifi.websocket.WebSocketService;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public abstract class AbstractWebSocketProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final String ATTR_WS_CS_ID = "websocket.controller.service.id";
    +    public static final String ATTR_WS_SESSION_ID = "websocket.session.id";
    +    public static final String ATTR_WS_ENDPOINT_ID = "websocket.endpoint.id";
    +    public static final String ATTR_WS_FAILURE_DETAIL = "websocket.failure.detail";
    +    public static final String ATTR_WS_MESSAGE_TYPE = "websocket.message.type";
    +    public static final String ATTR_WS_LOCAL_ADDRESS = "websocket.local.address";
    +    public static final String ATTR_WS_REMOTE_ADDRESS = "websocket.remote.address";
    +
    +    static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        return descriptors;
    +    }
    +
    +    protected ComponentLog logger;
    +    protected ProcessSessionFactory processSessionFactory;
    --- End diff --
    
    Added volatile keyword to these variables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1184#discussion_r87958885
  
    --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java ---
    @@ -0,0 +1,314 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.websocket;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.websocket.BinaryMessageConsumer;
    +import org.apache.nifi.websocket.ConnectedListener;
    +import org.apache.nifi.websocket.TextMessageConsumer;
    +import org.apache.nifi.websocket.WebSocketClientService;
    +import org.apache.nifi.websocket.WebSocketConfigurationException;
    +import org.apache.nifi.websocket.WebSocketConnectedMessage;
    +import org.apache.nifi.websocket.WebSocketMessage;
    +import org.apache.nifi.websocket.WebSocketService;
    +import org.apache.nifi.websocket.WebSocketSessionInfo;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +@TriggerSerially
    +public abstract class AbstractWebSocketGatewayProcessor extends AbstractWebSocketProcessor implements ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
    +
    +    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("max-queue-size")
    +            .displayName("Max Queue Size")
    +            .description("The WebSocket messages are kept in an on-memory queue," +
    +                    " then transferred to relationships when this processor is triggered." +
    +                    " If the 'Run Schedule' is significantly behind the rate" +
    +                    " at which the messages are arriving to this processor then a back up can occur." +
    +                    " This property specifies the maximum number of messages this processor will hold in memory at one time." +
    +                    " CAUTION: Any incoming WebSocket message arrived while the queue being full" +
    +                    " will be discarded and a warning message will be logged.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    private volatile LinkedBlockingQueue<WebSocketMessage> incomingMessageQueue;
    --- End diff --
    
    Thank you for bringing this up. I had been worrying about this queue, too. To make it simpler, I removed the intermediate queue, and let incoming messages to be transferred to NiFi relationships.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1184: NIFI-1002: Added WebSocket support.

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/1184
  
    Koji, this is great and I'll be looking at it shortly with hopes of merging it some time today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1184: NIFI-1002: Added WebSocket support.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/1184


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---