You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2021/04/01 16:22:34 UTC
[storm] branch 2.2.x-branch updated: [STORM-3763] send initial
message to remote client only after authentication completes (#3390)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch 2.2.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/2.2.x-branch by this push:
new 7543a13 [STORM-3763] send initial message to remote client only after authentication completes (#3390)
7543a13 is described below
commit 7543a13e021570b6b4d7e583e00823c0b3106a3b
Author: Meng (Ethan) Li <et...@gmail.com>
AuthorDate: Thu Apr 1 11:19:52 2021 -0500
[STORM-3763] send initial message to remote client only after authentication completes (#3390)
---
.../src/jvm/org/apache/storm/messaging/IContext.java | 3 ++-
.../jvm/org/apache/storm/messaging/netty/Server.java | 18 +++++++++++++++---
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index ac56a8a..aadd007 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -43,7 +43,8 @@ public interface IContext {
* @param stormId topology ID
* @param port port #
* @param cb The callback to deliver received messages to
- * @param newConnectionResponse Supplier of the initial message to send to new client connections
+ * @param newConnectionResponse Supplier of the initial message to send to new client connections. If authentication
+ * is required, the message will be sent after authentication is complete.
* @return server side connection
*/
IConnection bind(String stormId, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 8a37d1e..26ea178 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -65,16 +65,19 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
private final IConnectionCallback cb;
private final Supplier<Object> newConnectionResponse;
private volatile boolean closing = false;
+ private final boolean isNettyAuthRequired;
/**
* Starts Netty at the given port.
* @param topoConf The topology config
* @param port The port to start Netty at
* @param cb The callback to deliver incoming messages to
- * @param newConnectionResponse The response to send to clients when they connect. Can be null.
+ * @param newConnectionResponse The response to send to clients when they connect. Can be null. If authentication
+ * is required, the message will be sent after authentication is complete.
*/
Server(Map<String, Object> topoConf, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
this.topoConf = topoConf;
+ this.isNettyAuthRequired = (Boolean) topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
this.port = port;
ser = new KryoValuesSerializer(topoConf);
this.cb = cb;
@@ -252,8 +255,9 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
**/
@Override
public void channelActive(Channel c) {
- if (newConnectionResponse != null) {
- c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+ if (!isNettyAuthRequired) {
+ //if authentication is not required, treat it as authenticated.
+ authenticated(c);
}
allChannels.add(c);
}
@@ -276,6 +280,14 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
@Override
public void authenticated(Channel c) {
+ if (isNettyAuthRequired) {
+ LOG.debug("The channel {} is active and authenticated", c);
+ } else {
+ LOG.debug("The channel {} is active", c);
+ }
+ if (newConnectionResponse != null) {
+ c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+ }
}
@Override