You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/01 05:47:03 UTC

[rocketmq-connect] branch master updated: fixed null pointer exception #294 (#296)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a8749c3 fixed null pointer exception #294 (#296)
2a8749c3 is described below

commit 2a8749c3ac8400cf3a19e95d5fe832218e8d0171
Author: xiaoyi <su...@163.com>
AuthorDate: Thu Sep 1 13:46:59 2022 +0800

    fixed null pointer exception #294 (#296)
---
 .../service/StateManagementServiceImpl.java        | 23 +++++++++++++++++++---
 1 file changed, 20 insertions(+), 3 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
index 9c20f7eb..0c20296e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
@@ -135,20 +135,28 @@ public class StateManagementServiceImpl implements StateManagementService {
         /**connector status map*/
         Map<String, ConnectorStatus> connectorStatusMap = connectorStatusStore.getKVMap();
         connectorStatusMap.forEach((connectorName, connectorStatus) -> {
+            if (connectorStatus == null){
+                return;
+            }
             // send status
             put(connectorStatus);
         });
 
         /** task status map */
         Map<String, List<TaskStatus>> taskStatusMap = taskStatusStore.getKVMap();
+        if (taskStatusMap.isEmpty()){
+            return;
+        }
         taskStatusMap.forEach((connectorName, taskStatusList) -> {
+            if (taskStatusList == null || taskStatusList.isEmpty()){
+                return;
+            }
             taskStatusList.forEach(taskStatus -> {
                 // send status
                 put(taskStatus);
             });
         });
     }
-
     /**
      * Stop dependent services (if needed)
      */
@@ -165,12 +173,21 @@ public class StateManagementServiceImpl implements StateManagementService {
      * pre persist
      */
     private void prePersist() {
-        connAndTaskStatus.getConnectors().forEach((connectName, connectorStatus) -> {
+        Map<String, ConnAndTaskStatus.CacheEntry<ConnectorStatus>> connectors = connAndTaskStatus.getConnectors();
+        if (connectors.isEmpty()){
+            return;
+        }
+        connectors.forEach((connectName, connectorStatus) -> {
             connectorStatusStore.put(connectName, connectorStatus.get());
             Map<Integer, ConnAndTaskStatus.CacheEntry<TaskStatus>> cacheTaskStatus = connAndTaskStatus.getTasks().row(connectName);
+            if (cacheTaskStatus == null){
+                return;
+            }
             taskStatusStore.put(connectName, new ArrayList<>());
             cacheTaskStatus.forEach((taskId, taskStatus) -> {
-                taskStatusStore.get(connectName).add(taskStatus.get());
+                if (taskStatus != null){
+                    taskStatusStore.get(connectName).add(taskStatus.get());
+                }
             });
         });
     }