You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/09/13 09:06:20 UTC
[incubator-inlong] branch master updated: [INLONG-1538] TubeMQ
reports the error "Topic xxx not publish" when producing data (#1539)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3d96894 [INLONG-1538] TubeMQ reports the error "Topic xxx not publish" when producing data (#1539)
3d96894 is described below
commit 3d96894511597c1d6f0218ed3c9a6879bf2e9db2
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Sep 13 17:05:30 2021 +0800
[INLONG-1538] TubeMQ reports the error "Topic xxx not publish" when producing data (#1539)
---
.../tubemq/server/master/metamanage/MetaDataManager.java | 8 ++++++++
.../master/metamanage/metastore/MetaStoreService.java | 3 +++
.../master/nodemanage/nodebroker/DefBrokerRunManager.java | 14 +++++++++++++-
3 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index 8cd9f5a..58d630a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -47,6 +47,7 @@ import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.bdbstore.MasterGroupStatus;
+import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.BdbMetaStoreServiceImpl;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.MetaStoreService;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
@@ -148,6 +149,9 @@ public class MetaDataManager implements Server {
logger.info("BrokerConfManager StoreService stopped");
}
+ public void registerObserver(AliveObserver eventObserver) {
+ metaStoreService.registerObserver(eventObserver);
+ }
/**
* If this node is the master role
@@ -475,6 +479,10 @@ public class MetaDataManager implements Server {
return new BrokerProcessResult(entity.getBrokerId(), entity.getBrokerIp(), result);
}
+ public Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity qryEntity) {
+ return metaStoreService.getBrokerConfInfo(qryEntity);
+ }
+
/**
* Get broker configure information
*
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 052d014..2da923f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.inlong.tubemq.server.Server;
import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
+import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
import org.apache.inlong.tubemq.server.master.metamanage.keepalive.KeepAlive;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
@@ -319,6 +320,8 @@ public interface MetaStoreService extends KeepAlive, Server {
StringBuilder strBuffer,
ProcessResult result);
+ void registerObserver(AliveObserver eventObserver);
+
boolean isTopicNameInUsed(String topicName);
boolean hasGroupConsumeCtrlConf(String groupName);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index 6ccd228..f50f046 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -43,6 +43,7 @@ import org.apache.inlong.tubemq.server.common.utils.SerialIdUtils;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
+import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
/*
* Broker run manager
*/
-public class DefBrokerRunManager implements BrokerRunManager {
+public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
private static final Logger logger =
LoggerFactory.getLogger(DefBrokerRunManager.class);
// meta data manager
@@ -98,6 +99,17 @@ public class DefBrokerRunManager implements BrokerRunManager {
nodeInfo.getSecondKey(), true);
}
});
+ this.metaDataManager.registerObserver(this);
+ }
+
+ @Override
+ public void clearCacheData() {
+ // cache data not need clear
+ }
+
+ @Override
+ public void reloadCacheData() {
+ updBrokerStaticInfo(metaDataManager.getBrokerConfInfo(null));
}
@Override