You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2021/04/01 16:22:34 UTC

[storm] branch 2.2.x-branch updated: [STORM-3763] send initial message to remote client only after authentication completes (#3390)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch 2.2.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.2.x-branch by this push:
     new 7543a13  [STORM-3763] send initial message to remote client only after authentication completes (#3390)
7543a13 is described below

commit 7543a13e021570b6b4d7e583e00823c0b3106a3b
Author: Meng (Ethan) Li <et...@gmail.com>
AuthorDate: Thu Apr 1 11:19:52 2021 -0500

    [STORM-3763] send initial message to remote client only after authentication completes (#3390)
---
 .../src/jvm/org/apache/storm/messaging/IContext.java   |  3 ++-
 .../jvm/org/apache/storm/messaging/netty/Server.java   | 18 +++++++++++++++---
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index ac56a8a..aadd007 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -43,7 +43,8 @@ public interface IContext {
      * @param stormId topology ID
      * @param port     port #
      * @param cb The callback to deliver received messages to
-     * @param newConnectionResponse Supplier of the initial message to send to new client connections
+     * @param newConnectionResponse Supplier of the initial message to send to new client connections. If authentication
+     *                              is required, the message will be sent after authentication is complete.
      * @return server side connection
      */
     IConnection bind(String stormId, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 8a37d1e..26ea178 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -65,16 +65,19 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     private final IConnectionCallback cb;
     private final Supplier<Object> newConnectionResponse;
     private volatile boolean closing = false;
+    private final boolean isNettyAuthRequired;
 
     /**
      * Starts Netty at the given port.
      * @param topoConf The topology config
      * @param port The port to start Netty at
      * @param cb The callback to deliver incoming messages to
-     * @param newConnectionResponse The response to send to clients when they connect. Can be null.
+     * @param newConnectionResponse The response to send to clients when they connect. Can be null. If authentication
+     *                              is required, the message will be sent after authentication is complete.
      */
     Server(Map<String, Object> topoConf, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
         this.topoConf = topoConf;
+        this.isNettyAuthRequired = (Boolean) topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
         this.port = port;
         ser = new KryoValuesSerializer(topoConf);
         this.cb = cb;
@@ -252,8 +255,9 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
      **/
     @Override
     public void channelActive(Channel c) {
-        if (newConnectionResponse != null) {
-            c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+        if (!isNettyAuthRequired) {
+            //if authentication is not required, treat it as authenticated.
+            authenticated(c);
         }
         allChannels.add(c);
     }
@@ -276,6 +280,14 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
 
     @Override
     public void authenticated(Channel c) {
+        if (isNettyAuthRequired) {
+            LOG.debug("The channel {} is active and authenticated", c);
+        } else {
+            LOG.debug("The channel {} is active", c);
+        }
+        if (newConnectionResponse != null) {
+            c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+        }
     }
 
     @Override