You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by je...@apache.org on 2017/05/02 14:07:21 UTC
[1/2] nifi git commit: Incorporated review comments.
Repository: nifi
Updated Branches:
refs/heads/master 8fa35294e -> cca5c4209
Incorporated review comments.
- Added description on what session maintenance does.
- Added calling deregister when initial connection attempt fails so that a processor can retry connecting at next onTrigger.
This closes #1597
Signed-off-by: Jeremy Dyer <je...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cca5c420
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cca5c420
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cca5c420
Branch: refs/heads/master
Commit: cca5c420954527d44cadf08b7d23edfdfb5314e9
Parents: 0a014b4
Author: Koji Kawamura <ij...@apache.org>
Authored: Tue May 2 17:12:34 2017 +0900
Committer: Jeremy Dyer <je...@apache.org>
Committed: Tue May 2 10:02:55 2017 -0400
----------------------------------------------------------------------
.../websocket/AbstractWebSocketGatewayProcessor.java | 9 ++++++++-
.../apache/nifi/websocket/jetty/JettyWebSocketClient.java | 10 ++++++++--
2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/cca5c420/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
index 0262d98..c749456 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
@@ -147,6 +147,10 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
@OnStopped
public void onStopped(final ProcessContext context) throws IOException {
+ deregister();
+ }
+
+ private void deregister() {
if (webSocketService == null) {
return;
}
@@ -170,11 +174,14 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
try {
registerProcessorToService(context, webSocketService -> onWebSocketServiceReady(webSocketService));
} catch (IOException|WebSocketConfigurationException e) {
+ // Deregister processor if it failed so that it can retry next onTrigger.
+ deregister();
+ context.yield();
throw new ProcessException("Failed to register processor to WebSocket service due to: " + e, e);
}
}
- context.yield();//nothing really to do here since threading managed by smtp server sessions
+ context.yield();//nothing really to do here since handling WebSocket messages is done at ControllerService.
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cca5c420/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index 3d19eac..281b016 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -90,7 +90,12 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
.name("session-maintenance-interval")
.displayName("Session Maintenance Interval")
- .description("The interval between session maintenance activities.")
+ .description("The interval between session maintenance activities." +
+ " A WebSocket session established with a WebSocket server can be terminated due to different reasons" +
+ " including restarting the WebSocket server or timing out inactive sessions." +
+ " This session maintenance activity is periodically executed in order to reconnect those lost sessions," +
+ " so that a WebSocket client can reuse the same session id transparently after it reconnects successfully. " +
+ " The maintenance activity is executed until corresponding processors or this controller service is stopped.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@@ -238,10 +243,11 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
}
final String sessionId = activeSessions.get(clientId);
- // If this session is stil alive, do nothing.
+ // If this session is still alive, do nothing.
if (!router.containsSession(sessionId)) {
// This session is no longer active, reconnect it.
// If it fails, the sessionId will remain in activeSessions, and retries later.
+ // This reconnect attempt is continued until user explicitly stops a processor or this controller service.
connect(clientId, sessionId);
}
}
[2/2] nifi git commit: NIFI-3609: ConnectWebSocket auto session
recovery
Posted by je...@apache.org.
NIFI-3609: ConnectWebSocket auto session recovery
- Removed unused disconnect method from WebSocketService interface.
- Added session maintenance background thread at JettyWebSocketClient
which reconnects sessions those are still referred by ConnectWebSocket
processor but no longer active.
- Added Session Maintenance Interval property to JettyWebSocketClient.
- Allowed specifying existing session id so that it can be recovered
transparently.
- Moved test classes to appropriate package.
- Added test cases that verify the same session id can be used after
WebSocket server restarts.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0a014b47
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0a014b47
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0a014b47
Branch: refs/heads/master
Commit: 0a014b471bb11c2d15b5f166e09915eeb319f352
Parents: 8fa3529
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu Mar 16 17:10:38 2017 +0900
Committer: Jeremy Dyer <je...@apache.org>
Committed: Tue May 2 10:02:55 2017 -0400
----------------------------------------------------------------------
.../websocket/AbstractWebSocketService.java | 5 -
.../nifi/websocket/WebSocketMessageRouter.java | 4 +
.../nifi/websocket/WebSocketMessageRouters.java | 6 +-
.../apache/nifi/websocket/WebSocketService.java | 2 -
.../websocket/jetty/JettyWebSocketClient.java | 122 +++++++-
.../jetty/RoutingWebSocketListener.java | 14 +-
.../websocket/ControllerServiceTestContext.java | 68 -----
.../websocket/TestJettyWebSocketClient.java | 63 ----
.../TestJettyWebSocketCommunication.java | 224 --------------
.../TestJettyWebSocketSecureCommunication.java | 67 ----
.../websocket/TestJettyWebSocketServer.java | 51 ----
.../jetty/ControllerServiceTestContext.java | 68 +++++
.../jetty/TestJettyWebSocketClient.java | 62 ++++
.../jetty/TestJettyWebSocketCommunication.java | 306 +++++++++++++++++++
.../TestJettyWebSocketSecureCommunication.java | 68 +++++
.../jetty/TestJettyWebSocketServer.java | 50 +++
16 files changed, 681 insertions(+), 499 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
index fac1e42..36deb55 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
@@ -45,9 +45,4 @@ public abstract class AbstractWebSocketService extends AbstractControllerService
routers.sendMessage(endpointId, sessionId, sendMessage);
}
- @Override
- public void disconnect(final String endpointId, final String sessionId, final String reason) throws IOException, WebSocketConfigurationException {
- routers.disconnect(endpointId, sessionId, reason);
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
index e5034e1..0e8737a 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
@@ -124,4 +124,8 @@ public class WebSocketMessageRouter {
sessions.remove(sessionId);
}
+ public boolean containsSession(final String sessionId) {
+ return sessions.containsKey(sessionId);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
index 2551eb4..ae70ae5 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
@@ -59,6 +59,7 @@ public class WebSocketMessageRouters {
public synchronized void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException {
final WebSocketMessageRouter router = getRouterOrFail(endpointId);
+ routers.remove(endpointId);
router.deregisterProcessor(processor);
}
@@ -67,9 +68,4 @@ public class WebSocketMessageRouters {
router.sendMessage(sessionId, sendMessage);
}
- public void disconnect(final String endpointId, final String sessionId, final String reason) throws IOException, WebSocketConfigurationException {
- final WebSocketMessageRouter router = getRouterOrFail(endpointId);
- router.disconnect(sessionId, reason);
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
index 0de80bc..f86581b 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
@@ -45,6 +45,4 @@ public interface WebSocketService extends ControllerService {
void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException;
- void disconnect(final String endpointId, final String sessionId, final String reason) throws Exception;
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index af8dd8e..3d19eac 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -24,6 +24,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.websocket.WebSocketClientService;
@@ -39,8 +40,13 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
@Tags({"WebSocket", "Jetty", "client"})
@CapabilityDescription("Implementation of WebSocketClientService." +
@@ -81,6 +87,16 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
.defaultValue("3 sec")
.build();
+ public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
+ .name("session-maintenance-interval")
+ .displayName("Session Maintenance Interval")
+ .description("The interval between session maintenance activities.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("10 sec")
+ .build();
+
private static final List<PropertyDescriptor> properties;
static {
@@ -89,6 +105,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
props.add(WS_URI);
props.add(SSL_CONTEXT);
props.add(CONNECTION_TIMEOUT);
+ props.add(SESSION_MAINTENANCE_INTERVAL);
properties = Collections.unmodifiableList(props);
}
@@ -96,6 +113,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
private WebSocketClient client;
private URI webSocketUri;
private long connectionTimeoutMillis;
+ private volatile ScheduledExecutorService sessionMaintenanceScheduler;
+ private final ReentrantLock connectionLock = new ReentrantLock();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -116,15 +135,38 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
configurePolicy(context, client.getPolicy());
client.start();
+ activeSessions.clear();
webSocketUri = new URI(context.getProperty(WS_URI).getValue());
connectionTimeoutMillis = context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+
+ final Long sessionMaintenanceInterval = context.getProperty(SESSION_MAINTENANCE_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+
+ sessionMaintenanceScheduler = Executors.newSingleThreadScheduledExecutor();
+ sessionMaintenanceScheduler.scheduleAtFixedRate(() -> {
+ try {
+ maintainSessions();
+ } catch (final Exception e) {
+ getLogger().warn("Failed to maintain sessions due to {}", new Object[]{e}, e);
+ }
+ }, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS);
}
@OnDisabled
@OnShutdown
@Override
public void stopClient() throws Exception {
+ activeSessions.clear();
+
+ if (sessionMaintenanceScheduler != null) {
+ try {
+ sessionMaintenanceScheduler.shutdown();
+ } catch (Exception e) {
+ getLogger().warn("Failed to shutdown session maintainer due to {}", new Object[]{e}, e);
+ }
+ sessionMaintenanceScheduler = null;
+ }
+
if (client == null) {
return;
}
@@ -135,27 +177,81 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
@Override
public void connect(final String clientId) throws IOException {
+ connect(clientId, null);
+ }
+
+ private void connect(final String clientId, String sessionId) throws IOException {
+
+ connectionLock.lock();
- final WebSocketMessageRouter router;
try {
- router = routers.getRouterOrFail(clientId);
- } catch (WebSocketConfigurationException e) {
- throw new IllegalStateException("Failed to get router due to: " + e, e);
+ final WebSocketMessageRouter router;
+ try {
+ router = routers.getRouterOrFail(clientId);
+ } catch (WebSocketConfigurationException e) {
+ throw new IllegalStateException("Failed to get router due to: " + e, e);
+ }
+ final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
+ listener.setSessionId(sessionId);
+
+ final ClientUpgradeRequest request = new ClientUpgradeRequest();
+ final Future<Session> connect = client.connect(listener, webSocketUri, request);
+ getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
+
+ final Session session;
+ try {
+ session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
+ }
+ getLogger().info("Connected, session={}", new Object[]{session});
+ activeSessions.put(clientId, listener.getSessionId());
+
+ } finally {
+ connectionLock.unlock();
}
- final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
- final ClientUpgradeRequest request = new ClientUpgradeRequest();
- final Future<Session> connect = client.connect(listener, webSocketUri, request);
- getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
+ }
+
+ private Map<String, String> activeSessions = new ConcurrentHashMap<>();
- final Session session;
+ void maintainSessions() throws Exception {
+ if (client == null) {
+ return;
+ }
+
+ connectionLock.lock();
+
+ final ComponentLog logger = getLogger();
try {
- session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
+ // Loop through existing sessions and reconnect.
+ for (String clientId : activeSessions.keySet()) {
+ final WebSocketMessageRouter router;
+ try {
+ router = routers.getRouterOrFail(clientId);
+ } catch (final WebSocketConfigurationException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[]{clientId});
+ }
+ activeSessions.remove(clientId);
+ continue;
+ }
+
+ final String sessionId = activeSessions.get(clientId);
+ // If this session is stil alive, do nothing.
+ if (!router.containsSession(sessionId)) {
+ // This session is no longer active, reconnect it.
+ // If it fails, the sessionId will remain in activeSessions, and retries later.
+ connect(clientId, sessionId);
+ }
+ }
+ } finally {
+ connectionLock.unlock();
}
- getLogger().info("Connected, session={}", new Object[]{session});
+ if (logger.isDebugEnabled()) {
+ logger.debug("Session maintenance completed. activeSessions={}", new Object[]{activeSessions});
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
index 81376a4..ebc5e84 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
@@ -33,7 +33,11 @@ public class RoutingWebSocketListener extends WebSocketAdapter {
@Override
public void onWebSocketConnect(final Session session) {
super.onWebSocketConnect(session);
- sessionId = UUID.randomUUID().toString();
+ if (sessionId == null || sessionId.isEmpty()) {
+ // If sessionId is already assigned to this instance, don't publish new one.
+ // So that existing sesionId can be reused when reconnecting.
+ sessionId = UUID.randomUUID().toString();
+ }
final JettyWebSocketSession webSocketSession = new JettyWebSocketSession(sessionId, session);
router.captureSession(webSocketSession);
}
@@ -53,4 +57,12 @@ public class RoutingWebSocketListener extends WebSocketAdapter {
public void onWebSocketBinary(final byte[] payload, final int offset, final int len) {
router.onWebSocketBinary(sessionId, payload, offset, len);
}
+
+ public void setSessionId(String sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
deleted file mode 100644
index 062d528..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.util.MockControllerServiceInitializationContext;
-import org.apache.nifi.util.MockPropertyValue;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class ControllerServiceTestContext {
-
- private final ConfigurationContext configurationContext = mock(ConfigurationContext.class);
- private final ValidationContext validationContext = mock(ValidationContext.class);
- private MockControllerServiceInitializationContext initializationContext;
-
- public ControllerServiceTestContext(ControllerService controllerService, String id) {
- initializationContext = new MockControllerServiceInitializationContext(controllerService, id);
- doAnswer(invocation -> configurationContext.getProperty(invocation.getArgumentAt(0, PropertyDescriptor.class)))
- .when(validationContext).getProperty(any(PropertyDescriptor.class));
- controllerService.getPropertyDescriptors().forEach(prop -> setDefaultValue(prop));
- }
-
- public MockControllerServiceInitializationContext getInitializationContext() {
- return initializationContext;
- }
-
- public ConfigurationContext getConfigurationContext() {
- return configurationContext;
- }
-
- public MockPropertyValue setDefaultValue(PropertyDescriptor propertyDescriptor) {
- return setCustomValue(propertyDescriptor, propertyDescriptor.getDefaultValue());
- }
-
- public MockPropertyValue setCustomValue(PropertyDescriptor propertyDescriptor, String value) {
- final MockPropertyValue propertyValue = new MockPropertyValue(value, initializationContext);
- when(configurationContext.getProperty(eq(propertyDescriptor)))
- .thenReturn(propertyValue);
- return propertyValue;
- }
-
- public ValidationContext getValidationContext() {
- return validationContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
deleted file mode 100644
index cb979b9..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.websocket.jetty.JettyWebSocketClient;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestJettyWebSocketClient {
-
- @Test
- public void testValidationRequiredProperties() throws Exception {
- final JettyWebSocketClient service = new JettyWebSocketClient();
- final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
- service.initialize(context.getInitializationContext());
- final Collection<ValidationResult> results = service.validate(context.getValidationContext());
- assertEquals(1, results.size());
- final ValidationResult result = results.iterator().next();
- assertEquals(JettyWebSocketClient.WS_URI.getDisplayName(), result.getSubject());
- }
-
- @Test
- public void testValidationSuccess() throws Exception {
- final JettyWebSocketClient service = new JettyWebSocketClient();
- final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
- context.setCustomValue(JettyWebSocketClient.WS_URI, "ws://localhost:9001/test");
- service.initialize(context.getInitializationContext());
- final Collection<ValidationResult> results = service.validate(context.getValidationContext());
- assertEquals(0, results.size());
- }
-
- @Test
- public void testValidationProtocol() throws Exception {
- final JettyWebSocketClient service = new JettyWebSocketClient();
- final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
- context.setCustomValue(JettyWebSocketClient.WS_URI, "http://localhost:9001/test");
- service.initialize(context.getInitializationContext());
- final Collection<ValidationResult> results = service.validate(context.getValidationContext());
- assertEquals(1, results.size());
- final ValidationResult result = results.iterator().next();
- assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
deleted file mode 100644
index 7202d2b..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.websocket.jetty.JettyWebSocketClient;
-import org.apache.nifi.websocket.jetty.JettyWebSocketServer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-
-public class TestJettyWebSocketCommunication {
-
- protected int serverPort;
- protected String serverPath = "/test";
- protected WebSocketServerService serverService;
- protected ControllerServiceTestContext serverServiceContext;
- protected WebSocketClientService clientService;
- protected ControllerServiceTestContext clientServiceContext;
-
- protected boolean isSecure() {
- return false;
- }
-
- @Before
- public void setup() throws Exception {
- setupServer();
-
- setupClient();
- }
-
- private void setupServer() throws Exception {
- // Find an open port.
- try (final ServerSocket serverSocket = new ServerSocket(0)) {
- serverPort = serverSocket.getLocalPort();
- }
- serverService = new JettyWebSocketServer();
- serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
- serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
-
- customizeServer();
-
- serverService.initialize(serverServiceContext.getInitializationContext());
- serverService.startServer(serverServiceContext.getConfigurationContext());
- }
-
- protected void customizeServer() {
- }
-
- private void setupClient() throws Exception {
- clientService = new JettyWebSocketClient();
- clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
- clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
-
- customizeClient();
-
- clientService.initialize(clientServiceContext.getInitializationContext());
- clientService.startClient(clientServiceContext.getConfigurationContext());
- }
-
- protected void customizeClient() {
- }
-
- @After
- public void teardown() throws Exception {
- clientService.stopClient();
- serverService.stopServer();
- }
-
- protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
- }
-
- private boolean isWindowsEnvironment() {
- return System.getProperty("os.name").toLowerCase().startsWith("windows");
- }
-
- @Test
- public void testClientServerCommunication() throws Exception {
- assumeFalse(isWindowsEnvironment());
- // Expectations.
- final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
- final CountDownLatch clientConnectedServer = new CountDownLatch(1);
- final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
- final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
- final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
- final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
-
- final String textMessageFromClient = "Message from client.";
- final String textMessageFromServer = "Message from server.";
-
- final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
- doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
- final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
-
- doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
- .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
-
- doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
- .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
- doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
- .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
- serverService.registerProcessor(serverPath, serverProcessor);
-
- final String clientId = "client1";
-
- final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
- doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
- final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
-
-
- doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
- .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
-
- doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
- .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
- doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
- .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
- clientService.registerProcessor(clientId, clientProcessor);
-
- clientService.connect(clientId);
-
- assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
-
- clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
- clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
-
-
- assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
-
- serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
- serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
-
- assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
-
- clientService.deregisterProcessor(clientId, clientProcessor);
- serverService.deregisterProcessor(serverPath, serverProcessor);
- }
-
- protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
- final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
- assertNotNull(sessionInfo.getLocalAddress());
- assertNotNull(sessionInfo.getRemoteAddress());
- assertNotNull(sessionInfo.getSessionId());
- assertEquals(isSecure(), sessionInfo.isSecure());
- sessionIdRef.set(sessionInfo.getSessionId());
- latch.countDown();
- return null;
- }
-
- protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
- final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
- assertNotNull(sessionInfo.getLocalAddress());
- assertNotNull(sessionInfo.getRemoteAddress());
- assertNotNull(sessionInfo.getSessionId());
- assertEquals(isSecure(), sessionInfo.isSecure());
-
- final String receivedMessage = invocation.getArgumentAt(1, String.class);
- assertNotNull(receivedMessage);
- assertEquals(expectedMessage, receivedMessage);
- latch.countDown();
- return null;
- }
-
- protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
- final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
- assertNotNull(sessionInfo.getLocalAddress());
- assertNotNull(sessionInfo.getRemoteAddress());
- assertNotNull(sessionInfo.getSessionId());
- assertEquals(isSecure(), sessionInfo.isSecure());
-
- final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
- final byte[] expectedBinary = expectedMessage.getBytes();
- final int offset = invocation.getArgumentAt(2, Integer.class);
- final int length = invocation.getArgumentAt(3, Integer.class);
- assertNotNull(receivedMessage);
- assertEquals(expectedBinary.length, receivedMessage.length);
- assertEquals(expectedMessage, new String(receivedMessage));
- assertEquals(0, offset);
- assertEquals(expectedBinary.length, length);
- latch.countDown();
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
deleted file mode 100644
index e25189a..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.junit.Test;
-
-
-public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{
-
- private final StandardSSLContextService sslContextService = new StandardSSLContextService();
- private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
-
- public TestJettyWebSocketSecureCommunication() {
- try {
- sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
- sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
- sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
- sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
- sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
- sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
-
- sslContextService.initialize(sslTestContext.getInitializationContext());
- sslContextService.onConfigured(sslTestContext.getConfigurationContext());
- } catch (InitializationException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- protected boolean isSecure() {
- return true;
- }
-
- @Override
- protected void customizeServer() {
- serverServiceContext.getInitializationContext().addControllerService(sslContextService);
- serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
- }
-
- @Override
- protected void customizeClient() {
- clientServiceContext.getInitializationContext().addControllerService(sslContextService);
- clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
- }
-
- @Test
- public void testClientServerCommunication() throws Exception {
- super.testClientServerCommunication();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
deleted file mode 100644
index 907e689..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.websocket.jetty.JettyWebSocketServer;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestJettyWebSocketServer {
-
- @Test
- public void testValidationRequiredProperties() throws Exception {
- final JettyWebSocketServer service = new JettyWebSocketServer();
- final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
- service.initialize(context.getInitializationContext());
- final Collection<ValidationResult> results = service.validate(context.getValidationContext());
- assertEquals(1, results.size());
- final ValidationResult result = results.iterator().next();
- assertEquals(JettyWebSocketServer.LISTEN_PORT.getDisplayName(), result.getSubject());
- }
-
- @Test
- public void testValidationSuccess() throws Exception {
- final JettyWebSocketServer service = new JettyWebSocketServer();
- final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
- context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001");
- service.initialize(context.getInitializationContext());
- final Collection<ValidationResult> results = service.validate(context.getValidationContext());
- assertEquals(0, results.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ControllerServiceTestContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ControllerServiceTestContext.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ControllerServiceTestContext.java
new file mode 100644
index 0000000..fffba8a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ControllerServiceTestContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ControllerServiceTestContext {
+
+ private final ConfigurationContext configurationContext = mock(ConfigurationContext.class);
+ private final ValidationContext validationContext = mock(ValidationContext.class);
+ private MockControllerServiceInitializationContext initializationContext;
+
+ public ControllerServiceTestContext(ControllerService controllerService, String id) {
+ initializationContext = new MockControllerServiceInitializationContext(controllerService, id);
+ doAnswer(invocation -> configurationContext.getProperty(invocation.getArgumentAt(0, PropertyDescriptor.class)))
+ .when(validationContext).getProperty(any(PropertyDescriptor.class));
+ controllerService.getPropertyDescriptors().forEach(prop -> setDefaultValue(prop));
+ }
+
+ public MockControllerServiceInitializationContext getInitializationContext() {
+ return initializationContext;
+ }
+
+ public ConfigurationContext getConfigurationContext() {
+ return configurationContext;
+ }
+
+ public MockPropertyValue setDefaultValue(PropertyDescriptor propertyDescriptor) {
+ return setCustomValue(propertyDescriptor, propertyDescriptor.getDefaultValue());
+ }
+
+ public MockPropertyValue setCustomValue(PropertyDescriptor propertyDescriptor, String value) {
+ final MockPropertyValue propertyValue = new MockPropertyValue(value, initializationContext);
+ when(configurationContext.getProperty(eq(propertyDescriptor)))
+ .thenReturn(propertyValue);
+ return propertyValue;
+ }
+
+ public ValidationContext getValidationContext() {
+ return validationContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java
new file mode 100644
index 0000000..a20b54e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestJettyWebSocketClient {
+
+ @Test
+ public void testValidationRequiredProperties() throws Exception {
+ final JettyWebSocketClient service = new JettyWebSocketClient();
+ final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+ service.initialize(context.getInitializationContext());
+ final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+ assertEquals(1, results.size());
+ final ValidationResult result = results.iterator().next();
+ assertEquals(JettyWebSocketClient.WS_URI.getDisplayName(), result.getSubject());
+ }
+
+ @Test
+ public void testValidationSuccess() throws Exception {
+ final JettyWebSocketClient service = new JettyWebSocketClient();
+ final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+ context.setCustomValue(JettyWebSocketClient.WS_URI, "ws://localhost:9001/test");
+ service.initialize(context.getInitializationContext());
+ final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+ assertEquals(0, results.size());
+ }
+
+ @Test
+ public void testValidationProtocol() throws Exception {
+ final JettyWebSocketClient service = new JettyWebSocketClient();
+ final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+ context.setCustomValue(JettyWebSocketClient.WS_URI, "http://localhost:9001/test");
+ service.initialize(context.getInitializationContext());
+ final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+ assertEquals(1, results.size());
+ final ValidationResult result = results.iterator().next();
+ assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
new file mode 100644
index 0000000..a225447
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.websocket.BinaryMessageConsumer;
+import org.apache.nifi.websocket.ConnectedListener;
+import org.apache.nifi.websocket.TextMessageConsumer;
+import org.apache.nifi.websocket.WebSocketClientService;
+import org.apache.nifi.websocket.WebSocketServerService;
+import org.apache.nifi.websocket.WebSocketSessionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+
+public class TestJettyWebSocketCommunication {
+
+ protected int serverPort;
+ protected String serverPath = "/test";
+ protected WebSocketServerService serverService;
+ protected ControllerServiceTestContext serverServiceContext;
+ protected WebSocketClientService clientService;
+ protected ControllerServiceTestContext clientServiceContext;
+
+ protected boolean isSecure() {
+ return false;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupServer();
+
+ setupClient();
+ }
+
+ private void setupServer() throws Exception {
+ // Find an open port.
+ try (final ServerSocket serverSocket = new ServerSocket(0)) {
+ serverPort = serverSocket.getLocalPort();
+ }
+ serverService = new JettyWebSocketServer();
+ serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
+ serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
+
+ customizeServer();
+
+ serverService.initialize(serverServiceContext.getInitializationContext());
+ serverService.startServer(serverServiceContext.getConfigurationContext());
+ }
+
+ protected void customizeServer() {
+ }
+
+ private void setupClient() throws Exception {
+ clientService = new JettyWebSocketClient();
+ clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
+ clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
+
+ customizeClient();
+
+ clientService.initialize(clientServiceContext.getInitializationContext());
+ clientService.startClient(clientServiceContext.getConfigurationContext());
+ }
+
+ protected void customizeClient() {
+ }
+
+ @After
+ public void teardown() throws Exception {
+ clientService.stopClient();
+ serverService.stopServer();
+ }
+
+ protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
+ }
+
+ private boolean isWindowsEnvironment() {
+ return System.getProperty("os.name").toLowerCase().startsWith("windows");
+ }
+
+ @Test
+ public void testClientServerCommunication() throws Exception {
+ assumeFalse(isWindowsEnvironment());
+ // Expectations.
+ final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+ final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+ final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
+ final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
+ final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
+ final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
+
+ final String textMessageFromClient = "Message from client.";
+ final String textMessageFromServer = "Message from server.";
+
+ final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
+ doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+ final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
+
+ doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
+ .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+ doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
+ .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+ doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
+ .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+ serverService.registerProcessor(serverPath, serverProcessor);
+
+ final String clientId = "client1";
+
+ final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
+ doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+ final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
+
+
+ doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
+ .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+ doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
+ .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+ doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
+ .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+ clientService.registerProcessor(clientId, clientProcessor);
+
+ clientService.connect(clientId);
+
+ assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+ clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
+ clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+
+ assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+ serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
+ serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+ assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+ clientService.deregisterProcessor(clientId, clientProcessor);
+ serverService.deregisterProcessor(serverPath, serverProcessor);
+ }
+
+ @Test
+ public void testClientServerCommunicationRecovery() throws Exception {
+ assumeFalse(isWindowsEnvironment());
+ // Expectations.
+ final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+ final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+ final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
+ final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
+ final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
+ final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
+
+ final String textMessageFromClient = "Message from client.";
+ final String textMessageFromServer = "Message from server.";
+
+ final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
+ doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+ final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
+
+ doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
+ .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+ doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
+ .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+ doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
+ .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+ serverService.registerProcessor(serverPath, serverProcessor);
+
+ final String clientId = "client1";
+
+ final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
+ doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+ final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
+
+
+ doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
+ .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+ doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
+ .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+ doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
+ .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+ clientService.registerProcessor(clientId, clientProcessor);
+
+ clientService.connect(clientId);
+
+ assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+ // Nothing happens if maintenance is executed while sessions are alive.
+ ((JettyWebSocketClient) clientService).maintainSessions();
+
+ // Restart server.
+ serverService.stopServer();
+ serverService.startServer(serverServiceContext.getConfigurationContext());
+
+ // Sessions will be recreated with the same session ids.
+ ((JettyWebSocketClient) clientService).maintainSessions();
+
+ clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
+ clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+ assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+ serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
+ serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+ assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+ clientService.deregisterProcessor(clientId, clientProcessor);
+ serverService.deregisterProcessor(serverPath, serverProcessor);
+ }
+
+ protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
+ final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+ assertNotNull(sessionInfo.getLocalAddress());
+ assertNotNull(sessionInfo.getRemoteAddress());
+ assertNotNull(sessionInfo.getSessionId());
+ assertEquals(isSecure(), sessionInfo.isSecure());
+ sessionIdRef.set(sessionInfo.getSessionId());
+ latch.countDown();
+ return null;
+ }
+
+ protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
+ final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+ assertNotNull(sessionInfo.getLocalAddress());
+ assertNotNull(sessionInfo.getRemoteAddress());
+ assertNotNull(sessionInfo.getSessionId());
+ assertEquals(isSecure(), sessionInfo.isSecure());
+
+ final String receivedMessage = invocation.getArgumentAt(1, String.class);
+ assertNotNull(receivedMessage);
+ assertEquals(expectedMessage, receivedMessage);
+ latch.countDown();
+ return null;
+ }
+
+ protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
+ final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+ assertNotNull(sessionInfo.getLocalAddress());
+ assertNotNull(sessionInfo.getRemoteAddress());
+ assertNotNull(sessionInfo.getSessionId());
+ assertEquals(isSecure(), sessionInfo.isSecure());
+
+ final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
+ final byte[] expectedBinary = expectedMessage.getBytes();
+ final int offset = invocation.getArgumentAt(2, Integer.class);
+ final int length = invocation.getArgumentAt(3, Integer.class);
+ assertNotNull(receivedMessage);
+ assertEquals(expectedBinary.length, receivedMessage.length);
+ assertEquals(expectedMessage, new String(receivedMessage));
+ assertEquals(0, offset);
+ assertEquals(expectedBinary.length, length);
+ latch.countDown();
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
new file mode 100644
index 0000000..f5c96c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.websocket.WebSocketService;
+import org.junit.Test;
+
+
+public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{
+
+ private final StandardSSLContextService sslContextService = new StandardSSLContextService();
+ private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
+
+ public TestJettyWebSocketSecureCommunication() {
+ try {
+ sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
+ sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+ sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+ sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
+ sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+ sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+
+ sslContextService.initialize(sslTestContext.getInitializationContext());
+ sslContextService.onConfigured(sslTestContext.getConfigurationContext());
+ } catch (InitializationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected boolean isSecure() {
+ return true;
+ }
+
+ @Override
+ protected void customizeServer() {
+ serverServiceContext.getInitializationContext().addControllerService(sslContextService);
+ serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
+ }
+
+ @Override
+ protected void customizeClient() {
+ clientServiceContext.getInitializationContext().addControllerService(sslContextService);
+ clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
+ }
+
+ @Test
+ public void testClientServerCommunication() throws Exception {
+ super.testClientServerCommunication();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014b47/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketServer.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketServer.java
new file mode 100644
index 0000000..c056e21
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketServer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestJettyWebSocketServer {
+
+ @Test
+ public void testValidationRequiredProperties() throws Exception {
+ final JettyWebSocketServer service = new JettyWebSocketServer();
+ final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+ service.initialize(context.getInitializationContext());
+ final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+ assertEquals(1, results.size());
+ final ValidationResult result = results.iterator().next();
+ assertEquals(JettyWebSocketServer.LISTEN_PORT.getDisplayName(), result.getSubject());
+ }
+
+ @Test
+ public void testValidationSuccess() throws Exception {
+ final JettyWebSocketServer service = new JettyWebSocketServer();
+ final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
+ context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001");
+ service.initialize(context.getInitializationContext());
+ final Collection<ValidationResult> results = service.validate(context.getValidationContext());
+ assertEquals(0, results.size());
+ }
+
+}