You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by je...@apache.org on 2017/05/02 14:07:22 UTC

[2/2] nifi git commit: NIFI-3609: ConnectWebSocket auto session recovery

NIFI-3609: ConnectWebSocket auto session recovery

- Removed unused disconnect method from WebSocketService interface.
- Added session maintenance background thread at JettyWebSocketClient
  which reconnects sessions those are still referred by ConnectWebSocket
  processor but no longer active.
- Added Session Maintenance Interval property to JettyWebSocketClient.
- Allowed specifying existing session id so that it can be recovered
  transparently.
- Moved test classes to appropriate package.
- Added test cases that verify the same session id can be used after
  WebSocket server restarts.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0a014b47
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0a014b47
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0a014b47

Branch: refs/heads/master
Commit: 0a014b471bb11c2d15b5f166e09915eeb319f352
Parents: 8fa3529
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu Mar 16 17:10:38 2017 +0900
Committer: Jeremy Dyer <je...@apache.org>
Committed: Tue May 2 10:02:55 2017 -0400

----------------------------------------------------------------------
 .../websocket/AbstractWebSocketService.java     |   5 -
 .../nifi/websocket/WebSocketMessageRouter.java  |   4 +
 .../nifi/websocket/WebSocketMessageRouters.java |   6 +-
 .../apache/nifi/websocket/WebSocketService.java |   2 -
 .../websocket/jetty/JettyWebSocketClient.java   | 122 +++++++-
 .../jetty/RoutingWebSocketListener.java         |  14 +-
 .../websocket/ControllerServiceTestContext.java |  68 -----
 .../websocket/TestJettyWebSocketClient.java     |  63 ----
 .../TestJettyWebSocketCommunication.java        | 224 --------------
 .../TestJettyWebSocketSecureCommunication.java  |  67 ----
 .../websocket/TestJettyWebSocketServer.java     |  51 ----
 .../jetty/ControllerServiceTestContext.java     |  68 +++++
 .../jetty/TestJettyWebSocketClient.java         |  62 ++++
 .../jetty/TestJettyWebSocketCommunication.java  | 306 +++++++++++++++++++
 .../TestJettyWebSocketSecureCommunication.java  |  68 +++++
 .../jetty/TestJettyWebSocketServer.java         |  50 +++
 16 files changed, 681 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
index fac1e42..36deb55 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
@@ -45,9 +45,4 @@ public abstract class AbstractWebSocketService extends AbstractControllerService
         routers.sendMessage(endpointId, sessionId, sendMessage);
     }
 
-    @Override
-    public void disconnect(final String endpointId, final String sessionId, final String reason) throws IOException, WebSocketConfigurationException {
-        routers.disconnect(endpointId, sessionId, reason);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
index e5034e1..0e8737a 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
@@ -124,4 +124,8 @@ public class WebSocketMessageRouter {
         sessions.remove(sessionId);
     }
 
+    public boolean containsSession(final String sessionId) {
+        return sessions.containsKey(sessionId);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
index 2551eb4..ae70ae5 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
@@ -59,6 +59,7 @@ public class WebSocketMessageRouters {
 
     public synchronized void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException {
         final WebSocketMessageRouter router = getRouterOrFail(endpointId);
+        routers.remove(endpointId);
         router.deregisterProcessor(processor);
     }
 
@@ -67,9 +68,4 @@ public class WebSocketMessageRouters {
         router.sendMessage(sessionId, sendMessage);
     }
 
-    public void disconnect(final String endpointId, final String sessionId, final String reason) throws IOException, WebSocketConfigurationException {
-        final WebSocketMessageRouter router = getRouterOrFail(endpointId);
-        router.disconnect(sessionId, reason);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
index 0de80bc..f86581b 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
@@ -45,6 +45,4 @@ public interface WebSocketService extends ControllerService {
 
     void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException;
 
-    void disconnect(final String endpointId, final String sessionId, final String reason) throws Exception;
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index af8dd8e..3d19eac 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -24,6 +24,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.websocket.WebSocketClientService;
@@ -39,8 +40,13 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 @Tags({"WebSocket", "Jetty", "client"})
 @CapabilityDescription("Implementation of WebSocketClientService." +
@@ -81,6 +87,16 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
             .defaultValue("3 sec")
             .build();
 
+    public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
+            .name("session-maintenance-interval")
+            .displayName("Session Maintenance Interval")
+            .description("The interval between session maintenance activities.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("10 sec")
+            .build();
+
     private static final List<PropertyDescriptor> properties;
 
     static {
@@ -89,6 +105,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
         props.add(WS_URI);
         props.add(SSL_CONTEXT);
         props.add(CONNECTION_TIMEOUT);
+        props.add(SESSION_MAINTENANCE_INTERVAL);
 
         properties = Collections.unmodifiableList(props);
     }
@@ -96,6 +113,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
     private WebSocketClient client;
     private URI webSocketUri;
     private long connectionTimeoutMillis;
+    private volatile ScheduledExecutorService sessionMaintenanceScheduler;
+    private final ReentrantLock connectionLock = new ReentrantLock();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -116,15 +135,38 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
         configurePolicy(context, client.getPolicy());
 
         client.start();
+        activeSessions.clear();
 
         webSocketUri = new URI(context.getProperty(WS_URI).getValue());
         connectionTimeoutMillis = context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final Long sessionMaintenanceInterval = context.getProperty(SESSION_MAINTENANCE_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        sessionMaintenanceScheduler = Executors.newSingleThreadScheduledExecutor();
+        sessionMaintenanceScheduler.scheduleAtFixedRate(() -> {
+            try {
+                maintainSessions();
+            } catch (final Exception e) {
+                getLogger().warn("Failed to maintain sessions due to {}", new Object[]{e}, e);
+            }
+        }, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS);
     }
 
     @OnDisabled
     @OnShutdown
     @Override
     public void stopClient() throws Exception {
+        activeSessions.clear();
+
+        if (sessionMaintenanceScheduler != null) {
+            try {
+                sessionMaintenanceScheduler.shutdown();
+            } catch (Exception e) {
+                getLogger().warn("Failed to shutdown session maintainer due to {}", new Object[]{e}, e);
+            }
+            sessionMaintenanceScheduler = null;
+        }
+
         if (client == null) {
             return;
         }
@@ -135,27 +177,81 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
 
     @Override
     public void connect(final String clientId) throws IOException {
+        connect(clientId, null);
+    }
+
+    private void connect(final String clientId, String sessionId) throws IOException {
+
+        connectionLock.lock();
 
-        final WebSocketMessageRouter router;
         try {
-            router = routers.getRouterOrFail(clientId);
-        } catch (WebSocketConfigurationException e) {
-            throw new IllegalStateException("Failed to get router due to: "  + e, e);
+            final WebSocketMessageRouter router;
+            try {
+                router = routers.getRouterOrFail(clientId);
+            } catch (WebSocketConfigurationException e) {
+                throw new IllegalStateException("Failed to get router due to: "  + e, e);
+            }
+            final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
+            listener.setSessionId(sessionId);
+
+            final ClientUpgradeRequest request = new ClientUpgradeRequest();
+            final Future<Session> connect = client.connect(listener, webSocketUri, request);
+            getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
+
+            final Session session;
+            try {
+                session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+            } catch (Exception e) {
+                throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
+            }
+            getLogger().info("Connected, session={}", new Object[]{session});
+            activeSessions.put(clientId, listener.getSessionId());
+
+        } finally {
+            connectionLock.unlock();
         }
-        final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
 
-        final ClientUpgradeRequest request = new ClientUpgradeRequest();
-        final Future<Session> connect = client.connect(listener, webSocketUri, request);
-        getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
+    }
+
+    private Map<String, String> activeSessions = new ConcurrentHashMap<>();
 
-        final Session session;
+    void maintainSessions() throws Exception {
+        if (client == null) {
+            return;
+        }
+
+        connectionLock.lock();
+
+        final ComponentLog logger = getLogger();
         try {
-            session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
-        } catch (Exception e) {
-            throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
+            // Loop through existing sessions and reconnect.
+            for (String clientId : activeSessions.keySet()) {
+                final WebSocketMessageRouter router;
+                try {
+                    router = routers.getRouterOrFail(clientId);
+                } catch (final WebSocketConfigurationException e) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[]{clientId});
+                    }
+                    activeSessions.remove(clientId);
+                    continue;
+                }
+
+                final String sessionId = activeSessions.get(clientId);
+                // If this session is stil alive, do nothing.
+                if (!router.containsSession(sessionId)) {
+                    // This session is no longer active, reconnect it.
+                    // If it fails, the sessionId will remain in activeSessions, and retries later.
+                    connect(clientId, sessionId);
+                }
+            }
+        } finally {
+            connectionLock.unlock();
         }
-        getLogger().info("Connected, session={}", new Object[]{session});
 
+        if (logger.isDebugEnabled()) {
+            logger.debug("Session maintenance completed. activeSessions={}", new Object[]{activeSessions});
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
index 81376a4..ebc5e84 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
@@ -33,7 +33,11 @@ public class RoutingWebSocketListener extends WebSocketAdapter {
     @Override
     public void onWebSocketConnect(final Session session) {
         super.onWebSocketConnect(session);
-        sessionId = UUID.randomUUID().toString();
+        if (sessionId == null || sessionId.isEmpty()) {
+            // If sessionId is already assigned to this instance, don't publish new one.
+            // So that existing sesionId can be reused when reconnecting.
+            sessionId = UUID.randomUUID().toString();
+        }
         final JettyWebSocketSession webSocketSession = new JettyWebSocketSession(sessionId, session);
         router.captureSession(webSocketSession);
     }
@@ -53,4 +57,12 @@ public class RoutingWebSocketListener extends WebSocketAdapter {
     public void onWebSocketBinary(final byte[] payload, final int offset, final int len) {
         router.onWebSocketBinary(sessionId, payload, offset, len);
     }
+
+    public void setSessionId(String sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public String getSessionId() {
+        return sessionId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
deleted file mode 100644
index 062d528..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.util.MockControllerServiceInitializationContext;
-import org.apache.nifi.util.MockPropertyValue;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class ControllerServiceTestContext {
-
-    private final ConfigurationContext configurationContext = mock(ConfigurationContext.class);
-    private final ValidationContext validationContext = mock(ValidationContext.class);
-    private MockControllerServiceInitializationContext initializationContext;
-
-    public ControllerServiceTestContext(ControllerService controllerService, String id) {
-        initializationContext = new MockControllerServiceInitializationContext(controllerService, id);
-        doAnswer(invocation -> configurationContext.getProperty(invocation.getArgumentAt(0, PropertyDescriptor.class)))
-                .when(validationContext).getProperty(any(PropertyDescriptor.class));
-        controllerService.getPropertyDescriptors().forEach(prop -> setDefaultValue(prop));
-    }
-
-    public MockControllerServiceInitializationContext getInitializationContext() {
-        return initializationContext;
-    }
-
-    public ConfigurationContext getConfigurationContext() {
-        return configurationContext;
-    }
-
-    public MockPropertyValue setDefaultValue(PropertyDescriptor propertyDescriptor) {
-        return setCustomValue(propertyDescriptor, propertyDescriptor.getDefaultValue());
-    }
-
-    public MockPropertyValue setCustomValue(PropertyDescriptor propertyDescriptor, String value) {
-        final MockPropertyValue propertyValue = new MockPropertyValue(value, initializationContext);
-        when(configurationContext.getProperty(eq(propertyDescriptor)))
-                .thenReturn(propertyValue);
-        return propertyValue;
-    }
-
-    public ValidationContext getValidationContext() {
-        return validationContext;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
deleted file mode 100644
index cb979b9..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.websocket.jetty.JettyWebSocketClient;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestJettyWebSocketClient {
-
-    @Test
-    public void testValidationRequiredProperties() throws Exception {
-        final JettyWebSocketClient service = new JettyWebSocketClient();
-        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
-        service.initialize(context.getInitializationContext());
-        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
-        assertEquals(1, results.size());
-        final ValidationResult result = results.iterator().next();
-        assertEquals(JettyWebSocketClient.WS_URI.getDisplayName(), result.getSubject());
-    }
-
-    @Test
-    public void testValidationSuccess() throws Exception {
-        final JettyWebSocketClient service = new JettyWebSocketClient();
-        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
-        context.setCustomValue(JettyWebSocketClient.WS_URI, "ws://localhost:9001/test");
-        service.initialize(context.getInitializationContext());
-        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
-        assertEquals(0, results.size());
-    }
-
-    @Test
-    public void testValidationProtocol() throws Exception {
-        final JettyWebSocketClient service = new JettyWebSocketClient();
-        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
-        context.setCustomValue(JettyWebSocketClient.WS_URI, "http://localhost:9001/test");
-        service.initialize(context.getInitializationContext());
-        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
-        assertEquals(1, results.size());
-        final ValidationResult result = results.iterator().next();
-        assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
deleted file mode 100644
index 7202d2b..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.websocket.jetty.JettyWebSocketClient;
-import org.apache.nifi.websocket.jetty.JettyWebSocketServer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-
-public class TestJettyWebSocketCommunication {
-
-    protected int serverPort;
-    protected String serverPath = "/test";
-    protected WebSocketServerService serverService;
-    protected ControllerServiceTestContext serverServiceContext;
-    protected WebSocketClientService clientService;
-    protected ControllerServiceTestContext clientServiceContext;
-
-    protected boolean isSecure() {
-        return false;
-    }
-
-    @Before
-    public void setup() throws Exception {
-        setupServer();
-
-        setupClient();
-    }
-
-    private void setupServer() throws Exception {
-        // Find an open port.
-        try (final ServerSocket serverSocket = new ServerSocket(0)) {
-            serverPort = serverSocket.getLocalPort();
-        }
-        serverService = new JettyWebSocketServer();
-        serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
-        serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
-
-        customizeServer();
-
-        serverService.initialize(serverServiceContext.getInitializationContext());
-        serverService.startServer(serverServiceContext.getConfigurationContext());
-    }
-
-    protected void customizeServer() {
-    }
-
-    private void setupClient() throws Exception {
-        clientService = new JettyWebSocketClient();
-        clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
-        clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
-
-        customizeClient();
-
-        clientService.initialize(clientServiceContext.getInitializationContext());
-        clientService.startClient(clientServiceContext.getConfigurationContext());
-    }
-
-    protected void customizeClient() {
-    }
-
-    @After
-    public void teardown() throws Exception {
-        clientService.stopClient();
-        serverService.stopServer();
-    }
-
-    protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
-    }
-
-    private boolean isWindowsEnvironment() {
-        return System.getProperty("os.name").toLowerCase().startsWith("windows");
-    }
-
-    @Test
-    public void testClientServerCommunication() throws Exception {
-        assumeFalse(isWindowsEnvironment());
-        // Expectations.
-        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
-        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
-        final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
-        final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
-        final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
-        final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
-
-        final String textMessageFromClient = "Message from client.";
-        final String textMessageFromServer = "Message from server.";
-
-        final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
-        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
-        final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
-
-        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
-                .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
-                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
-                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
-        serverService.registerProcessor(serverPath, serverProcessor);
-
-        final String clientId = "client1";
-
-        final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
-        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
-        final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
-
-
-        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
-                .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
-                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
-                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
-        clientService.registerProcessor(clientId, clientProcessor);
-
-        clientService.connect(clientId);
-
-        assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
-
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
-
-
-        assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
-
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
-
-        assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
-
-        clientService.deregisterProcessor(clientId, clientProcessor);
-        serverService.deregisterProcessor(serverPath, serverProcessor);
-    }
-
-    protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-        sessionIdRef.set(sessionInfo.getSessionId());
-        latch.countDown();
-        return null;
-    }
-
-    protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-
-        final String receivedMessage = invocation.getArgumentAt(1, String.class);
-        assertNotNull(receivedMessage);
-        assertEquals(expectedMessage, receivedMessage);
-        latch.countDown();
-        return null;
-    }
-
-    protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-
-        final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
-        final byte[] expectedBinary = expectedMessage.getBytes();
-        final int offset = invocation.getArgumentAt(2, Integer.class);
-        final int length = invocation.getArgumentAt(3, Integer.class);
-        assertNotNull(receivedMessage);
-        assertEquals(expectedBinary.length, receivedMessage.length);
-        assertEquals(expectedMessage, new String(receivedMessage));
-        assertEquals(0, offset);
-        assertEquals(expectedBinary.length, length);
-        latch.countDown();
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
deleted file mode 100644
index e25189a..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.junit.Test;
-
-
-public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{
-
-    private final StandardSSLContextService sslContextService = new StandardSSLContextService();
-    private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
-
-    public TestJettyWebSocketSecureCommunication() {
-        try {
-            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
-            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
-            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
-            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
-            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
-            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
-
-            sslContextService.initialize(sslTestContext.getInitializationContext());
-            sslContextService.onConfigured(sslTestContext.getConfigurationContext());
-        } catch (InitializationException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    protected boolean isSecure() {
-        return true;
-    }
-
-    @Override
-    protected void customizeServer() {
-        serverServiceContext.getInitializationContext().addControllerService(sslContextService);
-        serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
-    }
-
-    @Override
-    protected void customizeClient() {
-        clientServiceContext.getInitializationContext().addControllerService(sslContextService);
-        clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
-    }
-
-    @Test
-    public void testClientServerCommunication() throws Exception {
-        super.testClientServerCommunication();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
deleted file mode 100644
index 907e689..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.websocket.jetty.JettyWebSocketServer;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestJettyWebSocketServer {
-
-    @Test
-    public void testValidationRequiredProperties() throws Exception {
-        final JettyWebSocketServer service = new JettyWebSocketServer();
-        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
-        service.initialize(context.getInitializationContext());
-        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
-        assertEquals(1, results.size());
-        final ValidationResult result = results.iterator().next();
-        assertEquals(JettyWebSocketServer.LISTEN_PORT.getDisplayName(), result.getSubject());
-    }
-
-    @Test
-    public void testValidationSuccess() throws Exception {
-        final JettyWebSocketServer service = new JettyWebSocketServer();
-        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
-        context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001");
-        service.initialize(context.getInitializationContext());
-        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
-        assertEquals(0, results.size());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ControllerServiceTestContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ControllerServiceTestContext.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ControllerServiceTestContext.java
new file mode 100644
index 0000000..fffba8a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ControllerServiceTestContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ControllerServiceTestContext {
+
+    private final ConfigurationContext configurationContext = mock(ConfigurationContext.class);
+    private final ValidationContext validationContext = mock(ValidationContext.class);
+    private MockControllerServiceInitializationContext initializationContext;
+
+    public ControllerServiceTestContext(ControllerService controllerService, String id) {
+        initializationContext = new MockControllerServiceInitializationContext(controllerService, id);
+        doAnswer(invocation -> configurationContext.getProperty(invocation.getArgumentAt(0, PropertyDescriptor.class)))
+                .when(validationContext).getProperty(any(PropertyDescriptor.class));
+        controllerService.getPropertyDescriptors().forEach(prop -> setDefaultValue(prop));
+    }
+
+    public MockControllerServiceInitializationContext getInitializationContext() {
+        return initializationContext;
+    }
+
+    public ConfigurationContext getConfigurationContext() {
+        return configurationContext;
+    }
+
+    public MockPropertyValue setDefaultValue(PropertyDescriptor propertyDescriptor) {
+        return setCustomValue(propertyDescriptor, propertyDescriptor.getDefaultValue());
+    }
+
+    public MockPropertyValue setCustomValue(PropertyDescriptor propertyDescriptor, String value) {
+        final MockPropertyValue propertyValue = new MockPropertyValue(value, initializationContext);
+        when(configurationContext.getProperty(eq(propertyDescriptor)))
+                .thenReturn(propertyValue);
+        return propertyValue;
+    }
+
+    public ValidationContext getValidationContext() {
+        return validationContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java
new file mode 100644
index 0000000..a20b54e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestJettyWebSocketClient {
+
+    @Test
+    public void testValidationRequiredProperties() throws Exception {
+        final JettyWebSocketClient service = new JettyWebSocketClient();
+        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+        assertEquals(1, results.size());
+        final ValidationResult result = results.iterator().next();
+        assertEquals(JettyWebSocketClient.WS_URI.getDisplayName(), result.getSubject());
+    }
+
+    @Test
+    public void testValidationSuccess() throws Exception {
+        final JettyWebSocketClient service = new JettyWebSocketClient();
+        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+        context.setCustomValue(JettyWebSocketClient.WS_URI, "ws://localhost:9001/test");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+        assertEquals(0, results.size());
+    }
+
+    @Test
+    public void testValidationProtocol() throws Exception {
+        final JettyWebSocketClient service = new JettyWebSocketClient();
+        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+        context.setCustomValue(JettyWebSocketClient.WS_URI, "http://localhost:9001/test");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+        assertEquals(1, results.size());
+        final ValidationResult result = results.iterator().next();
+        assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
new file mode 100644
index 0000000..a225447
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.websocket.BinaryMessageConsumer;
+import org.apache.nifi.websocket.ConnectedListener;
+import org.apache.nifi.websocket.TextMessageConsumer;
+import org.apache.nifi.websocket.WebSocketClientService;
+import org.apache.nifi.websocket.WebSocketServerService;
+import org.apache.nifi.websocket.WebSocketSessionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+
+public class TestJettyWebSocketCommunication {
+
+    protected int serverPort;
+    protected String serverPath = "/test";
+    protected WebSocketServerService serverService;
+    protected ControllerServiceTestContext serverServiceContext;
+    protected WebSocketClientService clientService;
+    protected ControllerServiceTestContext clientServiceContext;
+
+    protected boolean isSecure() {
+        return false;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        setupServer();
+
+        setupClient();
+    }
+
+    private void setupServer() throws Exception {
+        // Find an open port.
+        try (final ServerSocket serverSocket = new ServerSocket(0)) {
+            serverPort = serverSocket.getLocalPort();
+        }
+        serverService = new JettyWebSocketServer();
+        serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
+        serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
+
+        customizeServer();
+
+        serverService.initialize(serverServiceContext.getInitializationContext());
+        serverService.startServer(serverServiceContext.getConfigurationContext());
+    }
+
+    protected void customizeServer() {
+    }
+
+    private void setupClient() throws Exception {
+        clientService = new JettyWebSocketClient();
+        clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
+        clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
+
+        customizeClient();
+
+        clientService.initialize(clientServiceContext.getInitializationContext());
+        clientService.startClient(clientServiceContext.getConfigurationContext());
+    }
+
+    protected void customizeClient() {
+    }
+
+    @After
+    public void teardown() throws Exception {
+        clientService.stopClient();
+        serverService.stopServer();
+    }
+
+    protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
+    }
+
+    private boolean isWindowsEnvironment() {
+        return System.getProperty("os.name").toLowerCase().startsWith("windows");
+    }
+
+    @Test
+    public void testClientServerCommunication() throws Exception {
+        assumeFalse(isWindowsEnvironment());
+        // Expectations.
+        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+        final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
+        final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
+        final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
+        final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
+
+        final String textMessageFromClient = "Message from client.";
+        final String textMessageFromServer = "Message from server.";
+
+        final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
+        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+        final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
+
+        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
+                .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
+                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
+                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+        serverService.registerProcessor(serverPath, serverProcessor);
+
+        final String clientId = "client1";
+
+        final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
+        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+        final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
+
+
+        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
+                .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
+                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
+                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+        clientService.registerProcessor(clientId, clientProcessor);
+
+        clientService.connect(clientId);
+
+        assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+
+        assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+        assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+        clientService.deregisterProcessor(clientId, clientProcessor);
+        serverService.deregisterProcessor(serverPath, serverProcessor);
+    }
+
+    @Test
+    public void testClientServerCommunicationRecovery() throws Exception {
+        assumeFalse(isWindowsEnvironment());
+        // Expectations.
+        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+        final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
+        final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
+        final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
+        final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
+
+        final String textMessageFromClient = "Message from client.";
+        final String textMessageFromServer = "Message from server.";
+
+        final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
+        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+        final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
+
+        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
+                .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
+                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
+                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+        serverService.registerProcessor(serverPath, serverProcessor);
+
+        final String clientId = "client1";
+
+        final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
+        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+        final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
+
+
+        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
+                .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
+                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
+                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+        clientService.registerProcessor(clientId, clientProcessor);
+
+        clientService.connect(clientId);
+
+        assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+        // Nothing happens if maintenance is executed while sessions are alive.
+        ((JettyWebSocketClient) clientService).maintainSessions();
+
+        // Restart server.
+        serverService.stopServer();
+        serverService.startServer(serverServiceContext.getConfigurationContext());
+
+        // Sessions will be recreated with the same session ids.
+        ((JettyWebSocketClient) clientService).maintainSessions();
+
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+        assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+        assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+        clientService.deregisterProcessor(clientId, clientProcessor);
+        serverService.deregisterProcessor(serverPath, serverProcessor);
+    }
+
+    protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+        sessionIdRef.set(sessionInfo.getSessionId());
+        latch.countDown();
+        return null;
+    }
+
+    protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+
+        final String receivedMessage = invocation.getArgumentAt(1, String.class);
+        assertNotNull(receivedMessage);
+        assertEquals(expectedMessage, receivedMessage);
+        latch.countDown();
+        return null;
+    }
+
+    protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+
+        final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
+        final byte[] expectedBinary = expectedMessage.getBytes();
+        final int offset = invocation.getArgumentAt(2, Integer.class);
+        final int length = invocation.getArgumentAt(3, Integer.class);
+        assertNotNull(receivedMessage);
+        assertEquals(expectedBinary.length, receivedMessage.length);
+        assertEquals(expectedMessage, new String(receivedMessage));
+        assertEquals(0, offset);
+        assertEquals(expectedBinary.length, length);
+        latch.countDown();
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
new file mode 100644
index 0000000..f5c96c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.websocket.WebSocketService;
+import org.junit.Test;
+
+
+public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{
+
+    private final StandardSSLContextService sslContextService = new StandardSSLContextService();
+    private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
+
+    public TestJettyWebSocketSecureCommunication() {
+        try {
+            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
+            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
+            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+
+            sslContextService.initialize(sslTestContext.getInitializationContext());
+            sslContextService.onConfigured(sslTestContext.getConfigurationContext());
+        } catch (InitializationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected boolean isSecure() {
+        return true;
+    }
+
+    @Override
+    protected void customizeServer() {
+        serverServiceContext.getInitializationContext().addControllerService(sslContextService);
+        serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
+    }
+
+    @Override
+    protected void customizeClient() {
+        clientServiceContext.getInitializationContext().addControllerService(sslContextService);
+        clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
+    }
+
+    @Test
+    public void testClientServerCommunication() throws Exception {
+        super.testClientServerCommunication();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketServer.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketServer.java
new file mode 100644
index 0000000..c056e21
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketServer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestJettyWebSocketServer {
+
+    @Test
+    public void testValidationRequiredProperties() throws Exception {
+        final JettyWebSocketServer service = new JettyWebSocketServer();
+        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+        assertEquals(1, results.size());
+        final ValidationResult result = results.iterator().next();
+        assertEquals(JettyWebSocketServer.LISTEN_PORT.getDisplayName(), result.getSubject());
+    }
+
+    @Test
+    public void testValidationSuccess() throws Exception {
+        final JettyWebSocketServer service = new JettyWebSocketServer();
+        final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+        context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+        assertEquals(0, results.size());
+    }
+
+}