You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/04/20 23:22:20 UTC

nifi git commit: Support for websocket multiplexing to all existing websocket connections (Think chat to all clients instead of individual person). The core change was a change in WebSocketMessageRouter.java where if a sessionId is not present the messag

Repository: nifi
Updated Branches:
  refs/heads/master 816034bd0 -> 769e87467


Support for websocket multiplexing to all existing websocket
connections (Think chat to all clients instead of individual person).
The core change was a change in WebSocketMessageRouter.java where if a
sessionId is not present the message is sent to all connected clients.
So the key is leaving the sessionId to empty or null to send to all
clients. If the sessionId is specified the message will be sent just to
that session specified.

This closes #1649.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/769e8746
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/769e8746
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/769e8746

Branch: refs/heads/master
Commit: 769e874677ee5cfe1ee483c6a9b49f8db89ed93f
Parents: 816034b
Author: Jeremy Dyer <je...@apache.org>
Authored: Tue Apr 4 14:46:17 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Fri Apr 21 08:21:56 2017 +0900

----------------------------------------------------------------------
 .../nifi/processors/websocket/PutWebSocket.java | 68 +++++++++++---------
 .../processors/websocket/TestPutWebSocket.java  | 44 ++++++-------
 .../nifi/websocket/WebSocketMessageRouter.java  | 25 +++++--
 3 files changed, 80 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
index beef839..5a438b5 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
@@ -17,6 +17,26 @@
 
 package org.apache.nifi.processors.websocket;
 
+import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
+import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
+import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL;
+import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
+import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
+import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
+import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
+import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -38,25 +58,6 @@ import org.apache.nifi.websocket.WebSocketConfigurationException;
 import org.apache.nifi.websocket.WebSocketMessage;
 import org.apache.nifi.websocket.WebSocketService;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
-import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
-import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL;
-import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
-import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
-import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
-import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
-import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME;
-
 @Tags({"WebSocket", "publish", "send"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @TriggerSerially
@@ -76,7 +77,8 @@ public class PutWebSocket extends AbstractProcessor {
     public static final PropertyDescriptor PROP_WS_SESSION_ID = new PropertyDescriptor.Builder()
             .name("websocket-session-id")
             .displayName("WebSocket Session Id")
-            .description("A NiFi Expression to retrieve the session id.")
+            .description("A NiFi Expression to retrieve the session id. If not specified, a message will be " +
+                    "sent to all connected WebSocket peers for the WebSocket controller service endpoint.")
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(true)
@@ -166,8 +168,11 @@ public class PutWebSocket extends AbstractProcessor {
                 .evaluateAttributeExpressions(flowfile).getValue();
         final WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf(messageTypeStr);
 
-        if (StringUtils.isEmpty(sessionId)
-                || StringUtils.isEmpty(webSocketServiceId)
+        if (StringUtils.isEmpty(sessionId)) {
+            getLogger().debug("Specific SessionID not specified. Message will be broadcast to all connected clients.");
+        }
+
+        if (StringUtils.isEmpty(webSocketServiceId)
                 || StringUtils.isEmpty(webSocketServiceEndpoint)) {
             transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found.");
             return;
@@ -187,9 +192,14 @@ public class PutWebSocket extends AbstractProcessor {
         final byte[] messageContent = new byte[(int) flowfile.getSize()];
         final long startSending = System.currentTimeMillis();
 
+        final AtomicReference<String> transitUri = new AtomicReference<>();
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier());
-        attrs.put(ATTR_WS_SESSION_ID, sessionId);
+
+        if (!StringUtils.isEmpty(sessionId)) {
+            attrs.put(ATTR_WS_SESSION_ID, sessionId);
+        }
+
         attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint);
         attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr);
 
@@ -211,13 +221,14 @@ public class PutWebSocket extends AbstractProcessor {
 
                 attrs.put(ATTR_WS_LOCAL_ADDRESS, sender.getLocalAddress().toString());
                 attrs.put(ATTR_WS_REMOTE_ADDRESS, sender.getRemoteAddress().toString());
+                transitUri.set(sender.getTransitUri());
+            });
 
-                final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs);
-                final long transmissionMillis = System.currentTimeMillis() - startSending;
-                processSession.getProvenanceReporter().send(updatedFlowFile, sender.getTransitUri(), transmissionMillis);
+            final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs);
+            final long transmissionMillis = System.currentTimeMillis() - startSending;
+            processSession.getProvenanceReporter().send(updatedFlowFile, transitUri.get(), transmissionMillis);
 
-                processSession.transfer(updatedFlowFile, REL_SUCCESS);
-            });
+            processSession.transfer(updatedFlowFile, REL_SUCCESS);
 
         } catch (WebSocketConfigurationException|IllegalStateException|IOException e) {
             // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped.
@@ -235,5 +246,4 @@ public class PutWebSocket extends AbstractProcessor {
         return flowfile;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
index a987fe4..52f6f2a 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
@@ -16,24 +16,6 @@
  */
 package org.apache.nifi.processors.websocket;
 
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.apache.nifi.websocket.AbstractWebSocketSession;
-import org.apache.nifi.websocket.SendMessage;
-import org.apache.nifi.websocket.WebSocketMessage;
-import org.apache.nifi.websocket.WebSocketService;
-import org.apache.nifi.websocket.WebSocketSession;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
 import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
 import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL;
@@ -50,6 +32,24 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.websocket.AbstractWebSocketSession;
+import org.apache.nifi.websocket.SendMessage;
+import org.apache.nifi.websocket.WebSocketMessage;
+import org.apache.nifi.websocket.WebSocketService;
+import org.apache.nifi.websocket.WebSocketSession;
+import org.junit.Test;
+
 
 public class TestPutWebSocket {
 
@@ -92,12 +92,12 @@ public class TestPutWebSocket {
         runner.run();
 
         final List<MockFlowFile> succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS);
-        assertEquals(0, succeededFlowFiles.size());
+        //assertEquals(0, succeededFlowFiles.size());   //No longer valid test after NIFI-3318 since not specifying sessionid will send to all clients
+        assertEquals(1, succeededFlowFiles.size());
 
         final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE);
-        assertEquals(1, failedFlowFiles.size());
-        final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next();
-        assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL));
+        //assertEquals(1, failedFlowFiles.size());      //No longer valid test after NIFI-3318
+        assertEquals(0, failedFlowFiles.size());
 
         final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
         assertEquals(0, provenanceEvents.size());

http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
index 057b33d..e5034e1 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
@@ -16,14 +16,15 @@
  */
 package org.apache.nifi.websocket;
 
-import org.apache.nifi.processor.Processor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.Processor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class WebSocketMessageRouter {
     private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageRouter.class);
     private final String endpointId;
@@ -101,8 +102,20 @@ public class WebSocketMessageRouter {
     }
 
     public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException {
-        final WebSocketSession session = getSessionOrFail(sessionId);
-        sendMessage.send(session);
+        if (!StringUtils.isEmpty(sessionId)) {
+            final WebSocketSession session = getSessionOrFail(sessionId);
+            sendMessage.send(session);
+        } else {
+            //The sessionID is not specified so broadcast the message to all connected client sessions.
+            sessions.keySet().forEach(itrSessionId -> {
+                try {
+                    final WebSocketSession session = getSessionOrFail(itrSessionId);
+                    sendMessage.send(session);
+                } catch (IOException e) {
+                    logger.warn("Failed to send message to session {} due to {}", itrSessionId, e, e);
+                }
+            });
+        }
     }
 
     public void disconnect(final String sessionId, final String reason) throws IOException {