You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2021/09/28 20:25:42 UTC

[storm] 01/02: [STORM-3767] fix NPE in getComponentPendingProfileActions

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch 2.2.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git

commit 33e3934de1cf35c32ec87d6a7b79ea3cf32be2f2
Author: Meng (Ethan) Li <et...@gmail.com>
AuthorDate: Wed Sep 15 23:52:32 2021 -0500

    [STORM-3767] fix NPE in getComponentPendingProfileActions
---
 conf/defaults.yaml                                     |  2 +-
 docs/STORM-UI-REST-API.md                              |  4 ++--
 docs/Serialization.md                                  |  4 ++--
 storm-client/src/jvm/org/apache/storm/Config.java      |  2 +-
 .../apache/storm/messaging/netty/MessageDecoder.java   | 11 ++++++++++-
 .../src/jvm/org/apache/storm/utils/ShellUtils.java     | 18 ++----------------
 .../java/org/apache/storm/daemon/nimbus/Nimbus.java    | 12 +++++++-----
 7 files changed, 25 insertions(+), 28 deletions(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4d2f2f7..e325a1d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -266,7 +266,7 @@ topology.max.spout.pending: null    # ideally should be larger than topology.pro
 topology.state.synchronization.timeout.secs: 60
 topology.stats.sample.rate: 0.05
 topology.builtin.metrics.bucket.size.secs: 60
-topology.fall.back.on.java.serialization: true
+topology.fall.back.on.java.serialization: false
 topology.worker.childopts: null
 topology.worker.logwriter.childopts: "-Xmx64m"
 topology.tick.tuple.freq.secs: null
diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md
index 469855b..1a1b63a 100644
--- a/docs/STORM-UI-REST-API.md
+++ b/docs/STORM-UI-REST-API.md
@@ -63,7 +63,7 @@ Sample response (does not include all the data fields):
     "dev.zookeeper.path": "/tmp/dev-storm-zookeeper",
     "topology.tick.tuple.freq.secs": null,
     "topology.builtin.metrics.bucket.size.secs": 60,
-    "topology.fall.back.on.java.serialization": true,
+    "topology.fall.back.on.java.serialization": false,
     "topology.max.error.report.per.interval": 5,
     "zmq.linger.millis": 5000,
     "topology.skip.missing.kryo.registrations": false,
@@ -728,7 +728,7 @@ Sample response:
         "dev.zookeeper.path": "/tmp/dev-storm-zookeeper",
         "topology.tick.tuple.freq.secs": null,
         "topology.builtin.metrics.bucket.size.secs": 60,
-        "topology.fall.back.on.java.serialization": true,
+        "topology.fall.back.on.java.serialization": false,
         "topology.max.error.report.per.interval": 5,
         "zmq.linger.millis": 5000,
         "topology.skip.missing.kryo.registrations": false,
diff --git a/docs/Serialization.md b/docs/Serialization.md
index e35a0f9..47f0606 100644
--- a/docs/Serialization.md
+++ b/docs/Serialization.md
@@ -53,11 +53,11 @@ You may use this like any other service loader and storm will register the bindi
 
 ### Java serialization
 
-If Storm encounters a type for which it doesn't have a serialization registered, it will use Java serialization if possible. If the object can't be serialized with Java serialization, then Storm will throw an error.
+When `Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION` is set true, if Storm encounters a type for which it doesn't have a serialization registered, it will use Java serialization if possible. If the object can't be serialized with Java serialization, then Storm will throw an error.
 
 Beware that Java serialization is extremely expensive, both in terms of CPU cost as well as the size of the serialized object. It is highly recommended that you register custom serializers when you put the topology in production. The Java serialization behavior is there so that it's easy to prototype new topologies.
 
-You can turn off the behavior to fall back on Java serialization by setting the `Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION` config to false.
+You can turn on/off the behavior to fall back on Java serialization by setting the `Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION` config to true/false. The default value is false for security reasons.
 
 ### Component-specific serialization registrations
 
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index c27b468..85419c6 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -536,7 +536,7 @@ public class Config extends HashMap<String, Object> {
     @IsInteger
     public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS = "topology.builtin.metrics.bucket.size.secs";
     /**
-     * Whether or not to use Java serialization in a topology.
+     * Whether or not to use Java serialization in a topology. Default is set false for security reasons.
      */
     @IsBoolean
     public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION = "topology.fall.back.on.java.serialization";
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
index bced87c..a3b282a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
@@ -19,9 +19,12 @@ import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.shade.io.netty.buffer.ByteBuf;
 import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
 import org.apache.storm.shade.io.netty.handler.codec.ByteToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageDecoder extends ByteToMessageDecoder {
 
+    private static final Logger LOG = LoggerFactory.getLogger(MessageDecoder.class);
     private final KryoValuesDeserializer deser;
 
     public MessageDecoder(KryoValuesDeserializer deser) {
@@ -164,4 +167,10 @@ public class MessageDecoder extends ByteToMessageDecoder {
             out.add(ret);
         }
     }
-}
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        LOG.error("Exception thrown while decoding messages in channel {}; exception: ", ctx.channel(), cause);
+        ctx.close();
+    }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
index 96b3a02..86e34ea 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
@@ -103,28 +103,14 @@ public abstract class ShellUtils {
     }
 
     /**
-     * a Unix command to get the current user's groups list.
-     */
-    public static String[] getGroupsCommand() {
-        if (WINDOWS) {
-            throw new UnsupportedOperationException("Getting user groups is not supported on Windows");
-        }
-        return new String[]{ "bash", "-c", "groups" };
-    }
-
-    /**
-     * a Unix command to get a given user's groups list. If the OS is not WINDOWS, the command will get the user's primary group first and
-     * finally get the groups list which includes the primary group. i.e. the user's primary group will be included twice.
+     * a Unix command to get a given user's groups list. Windows is not supported.
      */
     public static String[] getGroupsForUserCommand(final String user) {
         if (WINDOWS) {
             throw new UnsupportedOperationException("Getting user groups is not supported on Windows");
         }
         //'groups username' command return is non-consistent across different unixes
-        return new String[]{
-            "bash", "-c", "id -gn " + user
-                          + "&& id -Gn " + user
-        };
+        return new String[]{"id", "-Gn", user};
     }
 
     private static void joinThread(Thread t) {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 83be964..5177d5a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -3537,12 +3537,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getComponentPendingProfileActionsCalls.mark();
             CommonTopoInfo info = getCommonTopoInfo(id, "getComponentPendingProfileActions");
-            Map<String, String> nodeToHost = info.assignment.get_node_host();
             Map<List<? extends Number>, List<Object>> exec2hostPort = new HashMap<>();
-            for (Entry<List<Long>, NodeInfo> entry : info.assignment.get_executor_node_port().entrySet()) {
-                NodeInfo ni = entry.getValue();
-                List<Object> hostPort = Arrays.asList(nodeToHost.get(ni.get_node()), ni.get_port_iterator().next().intValue());
-                exec2hostPort.put(entry.getKey(), hostPort);
+            if (info.assignment != null) {
+                Map<String, String> nodeToHost = info.assignment.get_node_host();
+                for (Entry<List<Long>, NodeInfo> entry : info.assignment.get_executor_node_port().entrySet()) {
+                    NodeInfo ni = entry.getValue();
+                    List<Object> hostPort = Arrays.asList(nodeToHost.get(ni.get_node()), ni.get_port_iterator().next().intValue());
+                    exec2hostPort.put(entry.getKey(), hostPort);
+                }
             }
             List<Map<String, Object>> nodeInfos =
                 StatsUtil.extractNodeInfosFromHbForComp(exec2hostPort, info.taskToComponent, false, componentId);