You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/09/13 09:06:20 UTC

[incubator-inlong] branch master updated: [INLONG-1538] TubeMQ reports the error "Topic xxx not publish" when producing data (#1539)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d96894  [INLONG-1538] TubeMQ reports the error "Topic xxx not publish" when producing data (#1539)
3d96894 is described below

commit 3d96894511597c1d6f0218ed3c9a6879bf2e9db2
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Sep 13 17:05:30 2021 +0800

    [INLONG-1538] TubeMQ reports the error "Topic xxx not publish" when producing data (#1539)
---
 .../tubemq/server/master/metamanage/MetaDataManager.java   |  8 ++++++++
 .../master/metamanage/metastore/MetaStoreService.java      |  3 +++
 .../master/nodemanage/nodebroker/DefBrokerRunManager.java  | 14 +++++++++++++-
 3 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index 8cd9f5a..58d630a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -47,6 +47,7 @@ import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
 import org.apache.inlong.tubemq.server.master.MasterConfig;
 import org.apache.inlong.tubemq.server.master.TMaster;
 import org.apache.inlong.tubemq.server.master.bdbstore.MasterGroupStatus;
+import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.BdbMetaStoreServiceImpl;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaStoreService;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
@@ -148,6 +149,9 @@ public class MetaDataManager implements Server {
         logger.info("BrokerConfManager StoreService stopped");
     }
 
+    public void registerObserver(AliveObserver eventObserver) {
+        metaStoreService.registerObserver(eventObserver);
+    }
 
     /**
      * If this node is the master role
@@ -475,6 +479,10 @@ public class MetaDataManager implements Server {
         return new BrokerProcessResult(entity.getBrokerId(), entity.getBrokerIp(), result);
     }
 
+    public Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity qryEntity) {
+        return metaStoreService.getBrokerConfInfo(qryEntity);
+    }
+
     /**
      * Get broker configure information
      *
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 052d014..2da923f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import org.apache.inlong.tubemq.server.Server;
 import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
+import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
 import org.apache.inlong.tubemq.server.master.metamanage.keepalive.KeepAlive;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
@@ -319,6 +320,8 @@ public interface MetaStoreService extends KeepAlive, Server {
                                     StringBuilder strBuffer,
                                     ProcessResult result);
 
+    void registerObserver(AliveObserver eventObserver);
+
     boolean isTopicNameInUsed(String topicName);
 
     boolean hasGroupConsumeCtrlConf(String groupName);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index 6ccd228..f50f046 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -43,6 +43,7 @@ import org.apache.inlong.tubemq.server.common.utils.SerialIdUtils;
 import org.apache.inlong.tubemq.server.master.MasterConfig;
 import org.apache.inlong.tubemq.server.master.TMaster;
 import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
+import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
 /*
  * Broker run manager
  */
-public class DefBrokerRunManager implements BrokerRunManager {
+public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
     private static final Logger logger =
             LoggerFactory.getLogger(DefBrokerRunManager.class);
     // meta data manager
@@ -98,6 +99,17 @@ public class DefBrokerRunManager implements BrokerRunManager {
                                 nodeInfo.getSecondKey(), true);
                     }
                 });
+        this.metaDataManager.registerObserver(this);
+    }
+
+    @Override
+    public void clearCacheData() {
+        // cache data not need clear
+    }
+
+    @Override
+    public void reloadCacheData() {
+        updBrokerStaticInfo(metaDataManager.getBrokerConfInfo(null));
     }
 
     @Override