You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/01 05:47:03 UTC
[rocketmq-connect] branch master updated: fixed null pointer exception #294 (#296)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 2a8749c3 fixed null pointer exception #294 (#296)
2a8749c3 is described below
commit 2a8749c3ac8400cf3a19e95d5fe832218e8d0171
Author: xiaoyi <su...@163.com>
AuthorDate: Thu Sep 1 13:46:59 2022 +0800
fixed null pointer exception #294 (#296)
---
.../service/StateManagementServiceImpl.java | 23 +++++++++++++++++++---
1 file changed, 20 insertions(+), 3 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
index 9c20f7eb..0c20296e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
@@ -135,20 +135,28 @@ public class StateManagementServiceImpl implements StateManagementService {
/**connector status map*/
Map<String, ConnectorStatus> connectorStatusMap = connectorStatusStore.getKVMap();
connectorStatusMap.forEach((connectorName, connectorStatus) -> {
+ if (connectorStatus == null){
+ return;
+ }
// send status
put(connectorStatus);
});
/** task status map */
Map<String, List<TaskStatus>> taskStatusMap = taskStatusStore.getKVMap();
+ if (taskStatusMap.isEmpty()){
+ return;
+ }
taskStatusMap.forEach((connectorName, taskStatusList) -> {
+ if (taskStatusList == null || taskStatusList.isEmpty()){
+ return;
+ }
taskStatusList.forEach(taskStatus -> {
// send status
put(taskStatus);
});
});
}
-
/**
* Stop dependent services (if needed)
*/
@@ -165,12 +173,21 @@ public class StateManagementServiceImpl implements StateManagementService {
* pre persist
*/
private void prePersist() {
- connAndTaskStatus.getConnectors().forEach((connectName, connectorStatus) -> {
+ Map<String, ConnAndTaskStatus.CacheEntry<ConnectorStatus>> connectors = connAndTaskStatus.getConnectors();
+ if (connectors.isEmpty()){
+ return;
+ }
+ connectors.forEach((connectName, connectorStatus) -> {
connectorStatusStore.put(connectName, connectorStatus.get());
Map<Integer, ConnAndTaskStatus.CacheEntry<TaskStatus>> cacheTaskStatus = connAndTaskStatus.getTasks().row(connectName);
+ if (cacheTaskStatus == null){
+ return;
+ }
taskStatusStore.put(connectName, new ArrayList<>());
cacheTaskStatus.forEach((taskId, taskStatus) -> {
- taskStatusStore.get(connectName).add(taskStatus.get());
+ if (taskStatus != null){
+ taskStatusStore.get(connectName).add(taskStatus.get());
+ }
});
});
}