You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/24 13:15:35 UTC
[incubator-inlong] branch master updated: [INLONG-4146][Manager] Get DataProxy configuration from inlong_cluster and inlong_group (#4152)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 86313d3ca [INLONG-4146][Manager] Get DataProxy configuration from inlong_cluster and inlong_group (#4152)
86313d3ca is described below
commit 86313d3ca1d71ec190278d469ac723762fff8306
Author: 卢春亮 <94...@qq.com>
AuthorDate: Tue May 24 21:15:29 2022 +0800
[INLONG-4146][Manager] Get DataProxy configuration from inlong_cluster and inlong_group (#4152)
* Get DataProxy configuration data from inlong_cluster and inlong_group.
* Move resultType to manager-common
* Remove unusable table
---
.../common/pojo/dataproxy}/CacheCluster.java | 79 +++-
.../common/pojo/dataproxy/InlongGroupId.java} | 85 ++--
.../common/pojo/dataproxy/InlongStreamId.java} | 85 ++--
.../common/pojo/dataproxy/ProxyCluster.java} | 64 ++-
.../inlong/manager/dao/entity/CacheTopic.java | 82 ----
.../inlong/manager/dao/entity/ClusterSet.java | 215 ---------
.../inlong/manager/dao/entity/FlumeChannel.java | 82 ----
.../inlong/manager/dao/entity/FlumeChannelExt.java | 120 -----
.../inlong/manager/dao/entity/FlumeSink.java | 101 -----
.../inlong/manager/dao/entity/FlumeSinkExt.java | 120 -----
.../inlong/manager/dao/entity/FlumeSource.java | 120 -----
.../inlong/manager/dao/entity/FlumeSourceExt.java | 120 -----
.../inlong/manager/dao/entity/ProxyCluster.java | 82 ----
.../manager/dao/mapper/ClusterSetMapper.java | 45 +-
.../main/resources/mappers/ClusterSetMapper.xml | 103 ++---
.../service/core/DataProxyClusterService.java | 2 +-
.../core/impl/DataProxyClusterServiceImpl.java | 14 +-
.../repository/DataProxyConfigRepository.java | 501 ++++++++-------------
.../manager-web/sql/apache_inlong_manager.sql | 203 ---------
.../controller/openapi/DataProxyController.java | 10 +-
.../src/main/resources/application.properties | 1 +
.../sort/standalone/dispatch/DispatchManager.java | 43 +-
22 files changed, 430 insertions(+), 1847 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
similarity index 51%
rename from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
index 9aec0c6e0..c8281ae1e 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
@@ -15,19 +15,21 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.dataproxy;
/**
* CacheCluster
*/
public class CacheCluster {
+
private String clusterName;
- private String setName;
- private String zone;
+ private String type;
+ private String clusterTag;
+ private String extTag;
+ private String extParams;
/**
* get clusterName
- *
* @return the clusterName
*/
public String getClusterName() {
@@ -36,7 +38,6 @@ public class CacheCluster {
/**
* set clusterName
- *
* @param clusterName the clusterName to set
*/
public void setClusterName(String clusterName) {
@@ -44,39 +45,67 @@ public class CacheCluster {
}
/**
- * get setName
- *
- * @return the setName
+ * get type
+ * @return the type
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * set type
+ * @param type the type to set
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * get clusterTag
+ * @return the clusterTag
+ */
+ public String getClusterTag() {
+ return clusterTag;
+ }
+
+ /**
+ * set clusterTag
+ * @param clusterTag the clusterTag to set
+ */
+ public void setClusterTag(String clusterTag) {
+ this.clusterTag = clusterTag;
+ }
+
+ /**
+ * get extTag
+ * @return the extTag
*/
- public String getSetName() {
- return setName;
+ public String getExtTag() {
+ return extTag;
}
/**
- * set setName
- *
- * @param setName the setName to set
+ * set extTag
+ * @param extTag the extTag to set
*/
- public void setSetName(String setName) {
- this.setName = setName;
+ public void setExtTag(String extTag) {
+ this.extTag = extTag;
}
/**
- * get zone
- *
- * @return the zone
+ * get extParams
+ * @return the extParams
*/
- public String getZone() {
- return zone;
+ public String getExtParams() {
+ return extParams;
}
/**
- * set zone
- *
- * @param zone the zone to set
+ * set extParams
+ * @param extParams the extParams to set
*/
- public void setZone(String zone) {
- this.zone = zone;
+ public void setExtParams(String extParams) {
+ this.extParams = extParams;
}
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongGroupId.java
similarity index 54%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongGroupId.java
index 0de815720..c50c97b74 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongGroupId.java
@@ -15,87 +15,80 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.dataproxy;
/**
- * InLongId
+ * InlongGroupId
*/
-public class InLongId {
- private String inlongId;
+public class InlongGroupId {
+
+ private String inlongGroupId;
+ private String clusterTag;
private String topic;
- private String params;
- private String setName;
+ private String extParams;
/**
- * get inlongId
- *
- * @return the inlongId
+ * get inlongGroupId
+ * @return the inlongGroupId
*/
- public String getInlongId() {
- return inlongId;
+ public String getInlongGroupId() {
+ return inlongGroupId;
}
/**
- * set inlongId
- *
- * @param inlongId the inlongId to set
+ * set inlongGroupId
+ * @param inlongGroupId the inlongGroupId to set
*/
- public void setInlongId(String inlongId) {
- this.inlongId = inlongId;
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
}
/**
- * get topic
- *
- * @return the topic
+ * get clusterTag
+ * @return the clusterTag
*/
- public String getTopic() {
- return topic;
+ public String getClusterTag() {
+ return clusterTag;
}
/**
- * set topic
- *
- * @param topic the topic to set
+ * set clusterTag
+ * @param clusterTag the clusterTag to set
*/
- public void setTopic(String topic) {
- this.topic = topic;
+ public void setClusterTag(String clusterTag) {
+ this.clusterTag = clusterTag;
}
/**
- * get params
- *
- * @return the params
+ * get topic
+ * @return the topic
*/
- public String getParams() {
- return params;
+ public String getTopic() {
+ return topic;
}
/**
- * set params
- *
- * @param params the params to set
+ * set topic
+ * @param topic the topic to set
*/
- public void setParams(String params) {
- this.params = params;
+ public void setTopic(String topic) {
+ this.topic = topic;
}
/**
- * get setName
- *
- * @return the setName
+ * get extParams
+ * @return the extParams
*/
- public String getSetName() {
- return setName;
+ public String getExtParams() {
+ return extParams;
}
/**
- * set setName
- *
- * @param setName the setName to set
+ * set extParams
+ * @param extParams the extParams to set
*/
- public void setSetName(String setName) {
- this.setName = setName;
+ public void setExtParams(String extParams) {
+ this.extParams = extParams;
}
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongStreamId.java
similarity index 53%
rename from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongStreamId.java
index 0de815720..3ac54690f 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/InlongStreamId.java
@@ -15,87 +15,80 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.dataproxy;
/**
- * InLongId
+ * InlongStreamId
*/
-public class InLongId {
- private String inlongId;
+public class InlongStreamId {
+
+ private String inlongGroupId;
+ private String inlongStreamId;
private String topic;
- private String params;
- private String setName;
+ private String extParams;
/**
- * get inlongId
- *
- * @return the inlongId
+ * get inlongGroupId
+ * @return the inlongGroupId
*/
- public String getInlongId() {
- return inlongId;
+ public String getInlongGroupId() {
+ return inlongGroupId;
}
/**
- * set inlongId
- *
- * @param inlongId the inlongId to set
+ * set inlongGroupId
+ * @param inlongGroupId the inlongGroupId to set
*/
- public void setInlongId(String inlongId) {
- this.inlongId = inlongId;
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
}
/**
- * get topic
- *
- * @return the topic
+ * get inlongStreamId
+ * @return the inlongStreamId
*/
- public String getTopic() {
- return topic;
+ public String getInlongStreamId() {
+ return inlongStreamId;
}
/**
- * set topic
- *
- * @param topic the topic to set
+ * set inlongStreamId
+ * @param inlongStreamId the inlongStreamId to set
*/
- public void setTopic(String topic) {
- this.topic = topic;
+ public void setInlongStreamId(String inlongStreamId) {
+ this.inlongStreamId = inlongStreamId;
}
/**
- * get params
- *
- * @return the params
+ * get topic
+ * @return the topic
*/
- public String getParams() {
- return params;
+ public String getTopic() {
+ return topic;
}
/**
- * set params
- *
- * @param params the params to set
+ * set topic
+ * @param topic the topic to set
*/
- public void setParams(String params) {
- this.params = params;
+ public void setTopic(String topic) {
+ this.topic = topic;
}
/**
- * get setName
- *
- * @return the setName
+ * get extParams
+ * @return the extParams
*/
- public String getSetName() {
- return setName;
+ public String getExtParams() {
+ return extParams;
}
/**
- * set setName
- *
- * @param setName the setName to set
+ * set extParams
+ * @param extParams the extParams to set
*/
- public void setSetName(String setName) {
- this.setName = setName;
+ public void setExtParams(String extParams) {
+ this.extParams = extParams;
}
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/ProxyCluster.java
similarity index 57%
rename from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/ProxyCluster.java
index 3f728bd79..5bdc069d9 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/ProxyCluster.java
@@ -15,21 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.dataproxy;
/**
- * CacheClusterExt
+ * ProxyCluster
*/
-public class CacheClusterExt {
+public class ProxyCluster {
private String clusterName;
- private String keyName;
- private String keyValue;
- private Integer isDeleted;
+ private String clusterTag;
+ private String extTag;
+ private String extParams;
/**
* get clusterName
- *
* @return the clusterName
*/
public String getClusterName() {
@@ -38,7 +37,6 @@ public class CacheClusterExt {
/**
* set clusterName
- *
* @param clusterName the clusterName to set
*/
public void setClusterName(String clusterName) {
@@ -46,53 +44,51 @@ public class CacheClusterExt {
}
/**
- * get keyName
- *
- * @return the keyName
+ * get clusterTag
+ * @return the clusterTag
*/
- public String getKeyName() {
- return keyName;
+ public String getClusterTag() {
+ return clusterTag;
}
/**
- * set keyName
- *
- * @param keyName the keyName to set
+ * set clusterTag
+ * @param clusterTag the clusterTag to set
*/
- public void setKeyName(String keyName) {
- this.keyName = keyName;
+ public void setClusterTag(String clusterTag) {
+ this.clusterTag = clusterTag;
}
/**
- * get keyValue
- *
- * @return the keyValue
+ * get extTag
+ * @return the extTag
*/
- public String getKeyValue() {
- return keyValue;
+ public String getExtTag() {
+ return extTag;
}
/**
- * set keyValue
- *
- * @param keyValue the keyValue to set
+ * set extTag
+ * @param extTag the extTag to set
*/
- public void setKeyValue(String keyValue) {
- this.keyValue = keyValue;
+ public void setExtTag(String extTag) {
+ this.extTag = extTag;
}
/**
- * getJIsDeleted
+ * get extParams
+ * @return the extParams
*/
- public Integer getJIsDeleted() {
- return isDeleted;
+ public String getExtParams() {
+ return extParams;
}
/**
- * setIsDeleted
+ * set extParams
+ * @param extParams the extParams to set
*/
- public void setIsDeleted(Integer isDeleted) {
- this.isDeleted = isDeleted;
+ public void setExtParams(String extParams) {
+ this.extParams = extParams;
}
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java
deleted file mode 100644
index 26b5bd1ef..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * CacheTopic
- */
-public class CacheTopic {
- private String topicName;
- private String setName;
- private Integer partitionNum;
-
- /**
- * get topicName
- *
- * @return the topicName
- */
- public String getTopicName() {
- return topicName;
- }
-
- /**
- * set topicName
- *
- * @param topicName the topicName to set
- */
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get partitionNum
- *
- * @return the partitionNum
- */
- public Integer getPartitionNum() {
- return partitionNum;
- }
-
- /**
- * set partitionNum
- *
- * @param partitionNum the partitionNum to set
- */
- public void setPartitionNum(Integer partitionNum) {
- this.partitionNum = partitionNum;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java
deleted file mode 100644
index 9344ebfd6..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * ClusterSet
- */
-public class ClusterSet {
- private String setName;
- private String cnName;
- private String description;
- private String middlewareType;
- private String inCharges;
- private String followers;
- private Integer status;
- private Integer isDeleted;
- private String creator;
- private String modifier;
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get cnName
- *
- * @return the cnName
- */
- public String getCnName() {
- return cnName;
- }
-
- /**
- * set cnName
- *
- * @param cnName the cnName to set
- */
- public void setCnName(String cnName) {
- this.cnName = cnName;
- }
-
- /**
- * get description
- *
- * @return the description
- */
- public String getDescription() {
- return description;
- }
-
- /**
- * set description
- *
- * @param description the description to set
- */
- public void setDescription(String description) {
- this.description = description;
- }
-
- /**
- * get middlewareType
- *
- * @return the middlewareType
- */
- public String getMiddlewareType() {
- return middlewareType;
- }
-
- /**
- * set middlewareType
- *
- * @param middlewareType the middlewareType to set
- */
- public void setMiddlewareType(String middlewareType) {
- this.middlewareType = middlewareType;
- }
-
- /**
- * get inCharges
- *
- * @return the inCharges
- */
- public String getInCharges() {
- return inCharges;
- }
-
- /**
- * set inCharges
- *
- * @param inCharges the inCharges to set
- */
- public void setInCharges(String inCharges) {
- this.inCharges = inCharges;
- }
-
- /**
- * get followers
- *
- * @return the followers
- */
- public String getFollowers() {
- return followers;
- }
-
- /**
- * set followers
- *
- * @param followers the followers to set
- */
- public void setFollowers(String followers) {
- this.followers = followers;
- }
-
- /**
- * get status
- *
- * @return the status
- */
- public Integer getStatus() {
- return status;
- }
-
- /**
- * set status
- *
- * @param status the status to set
- */
- public void setStatus(Integer status) {
- this.status = status;
- }
-
- /**
- * get isDeleted
- *
- * @return the isDeleted
- */
- public Integer getIsDeleted() {
- return isDeleted;
- }
-
- /**
- * set isDeleted
- *
- * @param isDeleted the isDeleted to set
- */
- public void setIsDeleted(Integer isDeleted) {
- this.isDeleted = isDeleted;
- }
-
- /**
- * get creator
- *
- * @return the creator
- */
- public String getCreator() {
- return creator;
- }
-
- /**
- * set creator
- *
- * @param creator the creator to set
- */
- public void setCreator(String creator) {
- this.creator = creator;
- }
-
- /**
- * get modifier
- *
- * @return the modifier
- */
- public String getModifier() {
- return modifier;
- }
-
- /**
- * set modifier
- *
- * @param modifier the modifier to set
- */
- public void setModifier(String modifier) {
- this.modifier = modifier;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java
deleted file mode 100644
index 1b19ff816..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeChannel
- */
-public class FlumeChannel {
- private String channelName;
- private String setName;
- private String type;
-
- /**
- * get channelName
- *
- * @return the channelName
- */
- public String getChannelName() {
- return channelName;
- }
-
- /**
- * set channelName
- *
- * @param channelName the channelName to set
- */
- public void setChannelName(String channelName) {
- this.channelName = channelName;
- }
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get type
- *
- * @return the type
- */
- public String getType() {
- return type;
- }
-
- /**
- * set type
- *
- * @param type the type to set
- */
- public void setType(String type) {
- this.type = type;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java
deleted file mode 100644
index 4fa5d7d11..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeChannelExt
- */
-public class FlumeChannelExt {
- private String parentName;
- private String setName;
- private String keyName;
- private String keyValue;
- private Integer isDeleted;
-
- /**
- * get parentName
- *
- * @return the parentName
- */
- public String getParentName() {
- return parentName;
- }
-
- /**
- * set parentName
- *
- * @param parentName the parentName to set
- */
- public void setParentName(String parentName) {
- this.parentName = parentName;
- }
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get keyName
- *
- * @return the keyName
- */
- public String getKeyName() {
- return keyName;
- }
-
- /**
- * set keyName
- *
- * @param keyName the keyName to set
- */
- public void setKeyName(String keyName) {
- this.keyName = keyName;
- }
-
- /**
- * get keyValue
- *
- * @return the keyValue
- */
- public String getKeyValue() {
- return keyValue;
- }
-
- /**
- * set keyValue
- *
- * @param keyValue the keyValue to set
- */
- public void setKeyValue(String keyValue) {
- this.keyValue = keyValue;
- }
-
- /**
- * get isDeleted
- *
- * @return the isDeleted
- */
- public Integer getIsDeleted() {
- return isDeleted;
- }
-
- /**
- * set isDeleted
- *
- * @param isDeleted the isDeleted to set
- */
- public void setIsDeleted(Integer isDeleted) {
- this.isDeleted = isDeleted;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java
deleted file mode 100644
index 54c536979..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeSink
- */
-public class FlumeSink {
- private String sinkName;
- private String setName;
- private String type;
- private String channel;
-
- /**
- * get sinkName
- *
- * @return the sinkName
- */
- public String getSinkName() {
- return sinkName;
- }
-
- /**
- * set sinkName
- *
- * @param sinkName the sinkName to set
- */
- public void setSinkName(String sinkName) {
- this.sinkName = sinkName;
- }
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get type
- *
- * @return the type
- */
- public String getType() {
- return type;
- }
-
- /**
- * set type
- *
- * @param type the type to set
- */
- public void setType(String type) {
- this.type = type;
- }
-
- /**
- * get channel
- *
- * @return the channel
- */
- public String getChannel() {
- return channel;
- }
-
- /**
- * set channel
- *
- * @param channel the channel to set
- */
- public void setChannel(String channel) {
- this.channel = channel;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java
deleted file mode 100644
index 8022da8db..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeSinkExt
- */
-public class FlumeSinkExt {
- private String parentName;
- private String setName;
- private String keyName;
- private String keyValue;
- private Integer isDeleted;
-
- /**
- * get parentName
- *
- * @return the parentName
- */
- public String getParentName() {
- return parentName;
- }
-
- /**
- * set parentName
- *
- * @param parentName the parentName to set
- */
- public void setParentName(String parentName) {
- this.parentName = parentName;
- }
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get keyName
- *
- * @return the keyName
- */
- public String getKeyName() {
- return keyName;
- }
-
- /**
- * set keyName
- *
- * @param keyName the keyName to set
- */
- public void setKeyName(String keyName) {
- this.keyName = keyName;
- }
-
- /**
- * get keyValue
- *
- * @return the keyValue
- */
- public String getKeyValue() {
- return keyValue;
- }
-
- /**
- * set keyValue
- *
- * @param keyValue the keyValue to set
- */
- public void setKeyValue(String keyValue) {
- this.keyValue = keyValue;
- }
-
- /**
- * get isDeleted
- *
- * @return the isDeleted
- */
- public Integer getIsDeleted() {
- return isDeleted;
- }
-
- /**
- * set isDeleted
- *
- * @param isDeleted the isDeleted to set
- */
- public void setIsDeleted(Integer isDeleted) {
- this.isDeleted = isDeleted;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java
deleted file mode 100644
index 0afb4a798..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeSource
- */
-public class FlumeSource {
- private String sourceName;
- private String setName;
- private String type;
- private String channels;
- private String selectorType;
-
- /**
- * get sourceName
- *
- * @return the sourceName
- */
- public String getSourceName() {
- return sourceName;
- }
-
- /**
- * set sourceName
- *
- * @param sourceName the sourceName to set
- */
- public void setSourceName(String sourceName) {
- this.sourceName = sourceName;
- }
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get type
- *
- * @return the type
- */
- public String getType() {
- return type;
- }
-
- /**
- * set type
- *
- * @param type the type to set
- */
- public void setType(String type) {
- this.type = type;
- }
-
- /**
- * get channels
- *
- * @return the channels
- */
- public String getChannels() {
- return channels;
- }
-
- /**
- * set channels
- *
- * @param channels the channels to set
- */
- public void setChannels(String channels) {
- this.channels = channels;
- }
-
- /**
- * get selectorType
- *
- * @return the selectorType
- */
- public String getSelectorType() {
- return selectorType;
- }
-
- /**
- * set selectorType
- *
- * @param selectorType the selectorType to set
- */
- public void setSelectorType(String selectorType) {
- this.selectorType = selectorType;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java
deleted file mode 100644
index 5891e00f6..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * FlumeSourceExt
- */
-public class FlumeSourceExt {
- private String parentName;
- private String setName;
- private String keyName;
- private String keyValue;
- private Integer isDeleted;
-
- /**
- * get parentName
- *
- * @return the parentName
- */
- public String getParentName() {
- return parentName;
- }
-
- /**
- * set parentName
- *
- * @param parentName the parentName to set
- */
- public void setParentName(String parentName) {
- this.parentName = parentName;
- }
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get keyName
- *
- * @return the keyName
- */
- public String getKeyName() {
- return keyName;
- }
-
- /**
- * set keyName
- *
- * @param keyName the keyName to set
- */
- public void setKeyName(String keyName) {
- this.keyName = keyName;
- }
-
- /**
- * get keyValue
- *
- * @return the keyValue
- */
- public String getKeyValue() {
- return keyValue;
- }
-
- /**
- * set keyValue
- *
- * @param keyValue the keyValue to set
- */
- public void setKeyValue(String keyValue) {
- this.keyValue = keyValue;
- }
-
- /**
- * get isDeleted
- *
- * @return the isDeleted
- */
- public Integer getIsDeleted() {
- return isDeleted;
- }
-
- /**
- * set isDeleted
- *
- * @param isDeleted the isDeleted to set
- */
- public void setIsDeleted(Integer isDeleted) {
- this.isDeleted = isDeleted;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java
deleted file mode 100644
index f1c0660ab..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-/**
- * ProxyCluster
- */
-public class ProxyCluster {
- private String clusterName;
- private String setName;
- private String zone;
-
- /**
- * get clusterName
- *
- * @return the clusterName
- */
- public String getClusterName() {
- return clusterName;
- }
-
- /**
- * set clusterName
- *
- * @param clusterName the clusterName to set
- */
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- /**
- * get setName
- *
- * @return the setName
- */
- public String getSetName() {
- return setName;
- }
-
- /**
- * set setName
- *
- * @param setName the setName to set
- */
- public void setSetName(String setName) {
- this.setName = setName;
- }
-
- /**
- * get zone
- *
- * @return the zone
- */
- public String getZone() {
- return zone;
- }
-
- /**
- * set zone
- *
- * @param zone the zone to set
- */
- public void setZone(String zone) {
- this.zone = zone;
- }
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
index 74ec7153a..090affdc6 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java
@@ -17,52 +17,25 @@
package org.apache.inlong.manager.dao.mapper;
-import java.util.List;
-
-import org.apache.inlong.manager.dao.entity.CacheCluster;
-import org.apache.inlong.manager.dao.entity.CacheClusterExt;
-import org.apache.inlong.manager.dao.entity.CacheTopic;
-import org.apache.inlong.manager.dao.entity.ClusterSet;
-import org.apache.inlong.manager.dao.entity.FlumeChannel;
-import org.apache.inlong.manager.dao.entity.FlumeChannelExt;
-import org.apache.inlong.manager.dao.entity.FlumeSink;
-import org.apache.inlong.manager.dao.entity.FlumeSinkExt;
-import org.apache.inlong.manager.dao.entity.FlumeSource;
-import org.apache.inlong.manager.dao.entity.FlumeSourceExt;
-import org.apache.inlong.manager.dao.entity.InLongId;
-import org.apache.inlong.manager.dao.entity.ProxyCluster;
-import org.apache.inlong.manager.dao.entity.ProxyClusterToCacheCluster;
+import org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster;
+import org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId;
+import org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId;
+import org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster;
import org.springframework.stereotype.Repository;
+import java.util.List;
+
/**
* ClusterSetMapper
*/
@Repository
public interface ClusterSetMapper {
- List<ClusterSet> selectClusterSet();
-
- List<InLongId> selectInlongId();
-
- List<CacheCluster> selectCacheCluster();
-
- List<CacheClusterExt> selectCacheClusterExt();
-
- List<CacheTopic> selectCacheTopic();
List<ProxyCluster> selectProxyCluster();
- List<ProxyClusterToCacheCluster> selectProxyClusterToCacheCluster();
-
- List<FlumeSource> selectFlumeSource();
-
- List<FlumeSourceExt> selectFlumeSourceExt();
-
- List<FlumeChannel> selectFlumeChannel();
-
- List<FlumeChannelExt> selectFlumeChannelExt();
-
- List<FlumeSink> selectFlumeSink();
+ List<CacheCluster> selectCacheCluster();
- List<FlumeSinkExt> selectFlumeSinkExt();
+ List<InlongGroupId> selectInlongGroupId();
+ List<InlongStreamId> selectInlongStreamId();
}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 2eb1b176e..a52f7dadc 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -19,79 +19,40 @@
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.manager.dao.mapper.ClusterSetMapper">
- <select id="selectClusterSet" resultType="org.apache.inlong.manager.dao.entity.ClusterSet">
- select set_name,
- cn_name,
- description,
- middleware_type,
- in_charges,
- followers,
- status,
- is_deleted,
- creator,
- modifier
- from cluster_set
+<mapper
+ namespace="org.apache.inlong.manager.dao.mapper.ClusterSetMapper">
+ <select id="selectProxyCluster"
+ resultType="org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster">
+ select name as cluster_name, cluster_tag, ext_tag, ext_params
+ from inlong_cluster
+ where type='DATA_PROXY'
+ and is_deleted='0'
+ </select>
+ <select id="selectCacheCluster"
+ resultType="org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster">
+ select name as cluster_name, type, cluster_tag, ext_tag,
+ ext_params
+ from inlong_cluster
+ where type in ('PULSAR','KAFKA','TUBE')
+ and is_deleted='0'
+ </select>
+ <select id="selectInlongGroupId"
+ resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId">
+ select inlong_group_id,
+ inlong_cluster_tag as
+ cluster_tag,
+ mq_resource as topic,
+ ext_params
+ from inlong_group
where is_deleted = 0
</select>
- <select id="selectInlongId" resultType="org.apache.inlong.manager.dao.entity.InLongId">
- select stream.inlong_stream_id as inlong_id,
- stream.mq_resource as topic,
- concat('fieldDelimiter=', stream.data_separator) as params,
- c.set_name as set_name
- from inlong_stream stream,
- cluster_set_inlongid c
- where stream.is_deleted = 0
- and stream.inlong_group_id = c.inlong_group_id
- </select>
- <select id="selectCacheCluster" resultType="org.apache.inlong.manager.dao.entity.CacheCluster">
- select cluster_name, set_name, zone
- from cache_cluster
- </select>
- <select id="selectCacheClusterExt" resultType="org.apache.inlong.manager.dao.entity.CacheClusterExt">
- select cluster_name, key_name, key_value, is_deleted
- from cache_cluster_ext
- where is_deleted = 0
- </select>
-
- <select id="selectCacheTopic" resultType="org.apache.inlong.manager.dao.entity.CacheTopic">
- select topic_name, set_name, partition_num
- from cache_topic
- </select>
- <select id="selectProxyCluster" resultType="org.apache.inlong.manager.dao.entity.ProxyCluster">
- select cluster_name, set_name, zone
- from proxy_cluster
- </select>
- <select id="selectProxyClusterToCacheCluster"
- resultType="org.apache.inlong.manager.dao.entity.ProxyClusterToCacheCluster">
- select proxy_cluster_name, cache_cluster_name
- from proxy_cluster_to_cache_cluster
- </select>
- <select id="selectFlumeSource" resultType="org.apache.inlong.manager.dao.entity.FlumeSource">
- select source_name, set_name, type, channels, selector_type
- from flume_source
- </select>
- <select id="selectFlumeSourceExt" resultType="org.apache.inlong.manager.dao.entity.FlumeSourceExt">
- select parent_name, set_name, key_name, key_value, is_deleted
- from flume_source_ext
- where is_deleted = 0
- </select>
- <select id="selectFlumeChannel" resultType="org.apache.inlong.manager.dao.entity.FlumeChannel">
- select channel_name, set_name, type
- from flume_channel
- </select>
- <select id="selectFlumeChannelExt" resultType="org.apache.inlong.manager.dao.entity.FlumeChannelExt">
- select parent_name, set_name, key_name, key_value, is_deleted
- from flume_channel_ext
- where is_deleted = 0
- </select>
- <select id="selectFlumeSink" resultType="org.apache.inlong.manager.dao.entity.FlumeSink">
- select sink_name, set_name, type, channel
- from flume_sink
- </select>
- <select id="selectFlumeSinkExt" resultType="org.apache.inlong.manager.dao.entity.FlumeSinkExt">
- select parent_name, set_name, key_name, key_value, is_deleted
- from flume_sink_ext
+ <select id="selectInlongStreamId"
+ resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId">
+ select inlong_group_id,
+ inlong_stream_id,
+ mq_resource as topic,
+ ext_params
+ from inlong_stream
where is_deleted = 0
</select>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
index c235ff343..edb74c764 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java
@@ -27,6 +27,6 @@ public interface DataProxyClusterService {
*
* @return data proxy config
*/
- String getAllConfig(String clusterName, String setName, String md5);
+ String getAllConfig(String clusterName, String md5);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
index 69b0147d4..bd7d801a3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
@@ -18,16 +18,16 @@
package org.apache.inlong.manager.service.core.impl;
import com.google.gson.Gson;
-import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyClusterSet;
import org.apache.inlong.manager.service.core.DataProxyClusterService;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import lombok.extern.slf4j.Slf4j;
+
/**
* DataProxy cluster service layer implementation class
*/
@@ -43,12 +43,8 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
*
* @return data proxy config
*/
- public String getAllConfig(String clusterName, String setName, String md5) {
- DataProxyClusterSet setObj = proxyRepository.getDataProxyClusterSet(setName);
- if (setObj == null) {
- return this.getErrorAllConfig();
- }
- String configMd5 = setObj.getMd5Map().get(clusterName);
+ public String getAllConfig(String clusterName, String md5) {
+ String configMd5 = proxyRepository.getProxyMd5(clusterName);
if (configMd5 == null) {
return this.getErrorAllConfig();
}
@@ -62,7 +58,7 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
Gson gson = new Gson();
return gson.toJson(response);
}
- String configJson = setObj.getProxyConfigJson().get(clusterName);
+ String configJson = proxyRepository.getProxyConfigJson(clusterName);
if (configJson == null) {
return this.getErrorAllConfig();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 41299c362..60c4da500 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -19,33 +19,21 @@ package org.apache.inlong.manager.service.repository;
import com.google.common.base.Splitter;
import com.google.gson.Gson;
+
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
-import org.apache.inlong.common.pojo.dataproxy.CacheTopicObject;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
import org.apache.inlong.common.pojo.dataproxy.IRepository;
import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
-import org.apache.inlong.common.pojo.dataproxy.ProxyChannel;
import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
-import org.apache.inlong.common.pojo.dataproxy.ProxySink;
-import org.apache.inlong.common.pojo.dataproxy.ProxySource;
import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
-import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyClusterSet;
-import org.apache.inlong.manager.dao.entity.CacheCluster;
-import org.apache.inlong.manager.dao.entity.CacheClusterExt;
-import org.apache.inlong.manager.dao.entity.CacheTopic;
-import org.apache.inlong.manager.dao.entity.ClusterSet;
-import org.apache.inlong.manager.dao.entity.FlumeChannel;
-import org.apache.inlong.manager.dao.entity.FlumeChannelExt;
-import org.apache.inlong.manager.dao.entity.FlumeSink;
-import org.apache.inlong.manager.dao.entity.FlumeSinkExt;
-import org.apache.inlong.manager.dao.entity.FlumeSource;
-import org.apache.inlong.manager.dao.entity.FlumeSourceExt;
-import org.apache.inlong.manager.dao.entity.InLongId;
-import org.apache.inlong.manager.dao.entity.ProxyCluster;
-import org.apache.inlong.manager.dao.entity.ProxyClusterToCacheCluster;
+import org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster;
+import org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId;
+import org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId;
+import org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster;
import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,31 +41,35 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
-import javax.annotation.PostConstruct;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
/**
* DataProxyConfigRepository
*/
-@SuppressWarnings("UnstableApiUsage")
@Repository(value = "dataProxyConfigRepository")
public class DataProxyConfigRepository implements IRepository {
public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(SEPARATOR).trimResults()
.withKeyValueSeparator(KEY_VALUE_SEPARATOR);
private static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
+ public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
+ public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
private static final Gson gson = new Gson();
- private final Map<String, ProxyClusterObject> proxyClusterMap = new HashMap<>();
- private final Map<String, CacheClusterObject> cacheClusterMap = new HashMap<>();
- private Map<String, DataProxyClusterSet> clusterSets = new HashMap<>();
+ // key: proxyClusterName, value: jsonString
+ private Map<String, String> proxyConfigJson = new ConcurrentHashMap<>();
+ // key: proxyClusterName, value: md5
+ private Map<String, String> proxyMd5Map = new ConcurrentHashMap<>();
+
private long reloadInterval;
@Autowired
@@ -102,348 +94,231 @@ public class DataProxyConfigRepository implements IRepository {
@Transactional(rollbackFor = Exception.class)
public void reload() {
LOGGER.info("start to reload config.");
- List<ClusterSet> setList = clusterSetMapper.selectClusterSet();
- if (setList.size() == 0) {
+ Map<String, ProxyClusterObject> proxyClusterMap = this.reloadProxyCluster();
+ if (proxyClusterMap.size() == 0) {
return;
}
-
- Map<String, DataProxyClusterSet> newClusterSets = new HashMap<>();
- for (ClusterSet set : setList) {
- String setName = set.getSetName();
- DataProxyClusterSet setObj = new DataProxyClusterSet();
- setObj.setSetName(setName);
- setObj.getCacheClusterSet().setSetName(setName);
- setObj.getCacheClusterSet().setType(set.getMiddlewareType());
- newClusterSets.put(setName, setObj);
+ Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = this.reloadCacheCluster();
+ Map<String, List<InLongIdObject>> inlongIdMap = this.reloadInlongId();
+ // mapping inlongIdMap
+ for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
+ String clusterTag = entry.getValue().getSetName();
+ List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
+ if (inlongIds != null) {
+ entry.getValue().setInlongIds(inlongIds);
+ }
}
- this.proxyClusterMap.clear();
- this.cacheClusterMap.clear();
- this.reloadCacheCluster(newClusterSets);
- this.reloadCacheClusterExt(newClusterSets);
- this.reloadCacheTopic(newClusterSets);
- this.reloadFlumeChannel(newClusterSets);
- this.reloadFlumeChannelExt(newClusterSets);
- this.reloadFlumeSource(newClusterSets);
- this.reloadFlumeSourceExt(newClusterSets);
- this.reloadFlumeSink(newClusterSets);
- this.reloadFlumeSinkExt(newClusterSets);
- // reload inlongid
- this.reloadInlongId(newClusterSets);
- this.reloadProxyCluster(newClusterSets);
- this.reloadProxy2Cache(newClusterSets);
- this.generateClusterJson(newClusterSets);
- // replace
- this.clusterSets = newClusterSets;
+ // generateClusterJson
+ this.generateClusterJson(proxyClusterMap, cacheClusterMap);
LOGGER.info("end to reload config.");
}
- /**
- * setReloadTimer
- */
- private void setReloadTimer() {
- Timer reloadTimer = new Timer(true);
- TimerTask task = new RepositoryTimerTask<DataProxyConfigRepository>(this);
- reloadTimer.scheduleAtFixedRate(task, reloadInterval, reloadInterval);
- }
-
- /**
- * get clusterSets
- *
- * @return the clusterSets
- */
- public Map<String, DataProxyClusterSet> getClusterSets() {
- return clusterSets;
- }
-
- /**
- * reloadCacheCluster
- *
- * @param newClusterSets
- */
- private void reloadCacheCluster(Map<String, DataProxyClusterSet> newClusterSets) {
- for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
- CacheClusterObject obj = new CacheClusterObject();
- obj.setName(cacheCluster.getClusterName());
- obj.setZone(cacheCluster.getZone());
- cacheClusterMap.put(obj.getName(), obj);
- String setName = cacheCluster.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- setObj.getCacheClusterSet().getCacheClusters().add(obj);
- }
- }
-
- /**
- * getOrCreateDataProxyClusterSet
- *
- * @param clusterSets
- * @param setName
- * @return
- */
- private DataProxyClusterSet getOrCreateDataProxyClusterSet(Map<String, DataProxyClusterSet> clusterSets,
- String setName) {
- DataProxyClusterSet setObj = clusterSets.get(setName);
- if (setObj == null) {
- setObj = new DataProxyClusterSet();
- setObj.setSetName(setName);
- clusterSets.put(setName, setObj);
- }
- return setObj;
- }
-
- /**
- * reloadCacheClusterExt
- *
- * @param newClusterSets
- */
- private void reloadCacheClusterExt(Map<String, DataProxyClusterSet> newClusterSets) {
- for (CacheClusterExt ext : clusterSetMapper.selectCacheClusterExt()) {
- String clusterName = ext.getClusterName();
- CacheClusterObject cacheClusterObject = cacheClusterMap.get(clusterName);
- if (cacheClusterObject != null) {
- cacheClusterObject.getParams().put(ext.getKeyName(), ext.getKeyValue());
- }
- }
- }
-
- /**
- * reloadCacheTopic
- *
- * @param newClusterSets
- */
- private void reloadCacheTopic(Map<String, DataProxyClusterSet> newClusterSets) {
- for (CacheTopic cacheTopic : clusterSetMapper.selectCacheTopic()) {
- CacheTopicObject obj = new CacheTopicObject();
- obj.setTopic(cacheTopic.getTopicName());
- obj.setPartitionNum(cacheTopic.getPartitionNum());
- String setName = cacheTopic.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- setObj.getCacheClusterSet().getTopics().add(obj);
- }
- }
-
/**
* reloadProxyCluster
- *
- * @param newClusterSets
*/
- private void reloadProxyCluster(Map<String, DataProxyClusterSet> newClusterSets) {
+ private Map<String, ProxyClusterObject> reloadProxyCluster() {
+ Map<String, ProxyClusterObject> proxyClusterMap = new HashMap<>();
for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) {
- String setName = proxyCluster.getSetName();
ProxyClusterObject obj = new ProxyClusterObject();
obj.setName(proxyCluster.getClusterName());
- obj.setSetName(setName);
- obj.setZone(proxyCluster.getZone());
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- setObj.getProxyClusterList().add(obj);
- this.proxyClusterMap.put(obj.getName(), obj);
- // channels
- obj.getChannels().addAll(setObj.getProxyChannelMap().values());
- // inlongids
- obj.getInlongIds().addAll(setObj.getInlongIds());
- // sinks
- obj.getSinks().addAll(setObj.getProxySinkMap().values());
- // sources
- obj.getSources().addAll(setObj.getProxySourceMap().values());
+ obj.setSetName(proxyCluster.getClusterTag());
+ obj.setZone(proxyCluster.getExtTag());
+ proxyClusterMap.put(obj.getName(), obj);
}
+ return proxyClusterMap;
}
/**
- * reloadFlumeChannel
- *
- * @param newClusterSets
- */
- private void reloadFlumeChannel(Map<String, DataProxyClusterSet> newClusterSets) {
- for (FlumeChannel flumeChannel : clusterSetMapper.selectFlumeChannel()) {
- ProxyChannel obj = new ProxyChannel();
- obj.setName(flumeChannel.getChannelName());
- obj.setType(flumeChannel.getType());
- String setName = flumeChannel.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- setObj.getProxyChannelMap().put(obj.getName(), obj);
- }
- }
-
- /**
- * reloadFlumeChannelExt
- *
- * @param newClusterSets
+ * reloadCacheCluster
*/
- private void reloadFlumeChannelExt(Map<String, DataProxyClusterSet> newClusterSets) {
- for (FlumeChannelExt ext : clusterSetMapper.selectFlumeChannelExt()) {
- String setName = ext.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- ProxyChannel obj = setObj.getProxyChannelMap().get(ext.getParentName());
- if (obj != null) {
- obj.getParams().put(ext.getKeyName(), ext.getKeyValue());
+ private Map<String, Map<String, List<CacheCluster>>> reloadCacheCluster() {
+ Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<>();
+ for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
+ Map<String, String> tagMap = MAP_SPLITTER.split(cacheCluster.getExtTag());
+ String producerTag = tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
+ if (StringUtils.equalsIgnoreCase(producerTag, Boolean.TRUE.toString())) {
+ cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTag(), k -> new HashMap<>())
+ .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
}
}
+ return cacheClusterMap;
}
/**
- * reloadFlumeSource
- *
- * @param newClusterSets
- */
- private void reloadFlumeSource(Map<String, DataProxyClusterSet> newClusterSets) {
- for (FlumeSource flumeSource : clusterSetMapper.selectFlumeSource()) {
- ProxySource obj = new ProxySource();
- obj.setName(flumeSource.getSourceName());
- obj.setSelectorType(flumeSource.getSelectorType());
- obj.setType(flumeSource.getType());
- String channels = flumeSource.getChannels();
- obj.getChannels().addAll(Arrays.asList(channels.split("\\s+")));
- String setName = flumeSource.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- setObj.getProxySourceMap().put(obj.getName(), obj);
- }
- }
-
- /**
- * reloadFlumeSourceExt
- *
- * @param newClusterSets
+ * reloadInlongId
*/
- private void reloadFlumeSourceExt(Map<String, DataProxyClusterSet> newClusterSets) {
- for (FlumeSourceExt ext : clusterSetMapper.selectFlumeSourceExt()) {
- String setName = ext.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- ProxySource obj = setObj.getProxySourceMap().get(ext.getParentName());
- if (obj != null) {
- obj.getParams().put(ext.getKeyName(), ext.getKeyValue());
+ @SuppressWarnings("unchecked")
+ private Map<String, List<InLongIdObject>> reloadInlongId() {
+ // parse group
+ Map<String, InlongGroupId> groupIdMap = new HashMap<>();
+ clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
+ // parse stream
+ Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
+ for (InlongStreamId streamIdObj : clusterSetMapper.selectInlongStreamId()) {
+ String groupId = streamIdObj.getInlongGroupId();
+ InlongGroupId groupIdObj = groupIdMap.get(groupId);
+ if (groupId == null) {
+ continue;
+ }
+ // choose topic
+ String groupTopic = groupIdObj.getTopic();
+ String streamTopic = streamIdObj.getTopic();
+ String finalTopic = null;
+ if (StringUtils.isEmpty(groupTopic)) {
+ // both empty then ignore
+ if (StringUtils.isEmpty(streamTopic)) {
+ continue;
+ } else {
+ finalTopic = streamTopic;
+ }
+ } else {
+ if (StringUtils.isEmpty(streamTopic)) {
+ finalTopic = groupTopic;
+ } else {
+ // Pulsar: namespace+topic
+ finalTopic = groupTopic + "/" + streamTopic;
+ }
+ }
+ // concat id
+ InLongIdObject obj = new InLongIdObject();
+ String inlongId = groupId + "." + streamIdObj.getInlongStreamId();
+ obj.setInlongId(inlongId);
+ obj.setTopic(finalTopic);
+ Map<String, String> params = new HashMap<>();
+ obj.setParams(params);
+ // parse group extparams
+ if (!StringUtils.isEmpty(groupIdObj.getExtParams())) {
+ try {
+ Map<String, String> groupParams = gson.fromJson(groupIdObj.getExtParams(), Map.class);
+ params.putAll(groupParams);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
}
+ // parse stream extparams
+ if (!StringUtils.isEmpty(streamIdObj.getExtParams())) {
+ try {
+ Map<String, String> streamParams = gson.fromJson(streamIdObj.getExtParams(), Map.class);
+ params.putAll(streamParams);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
}
+ return inlongIdMap;
}
/**
- * reloadFlumeSink
- *
- * @param newClusterSets
+ * setReloadTimer
*/
- private void reloadFlumeSink(Map<String, DataProxyClusterSet> newClusterSets) {
- for (FlumeSink flumeSink : clusterSetMapper.selectFlumeSink()) {
- ProxySink obj = new ProxySink();
- obj.setName(flumeSink.getSinkName());
- obj.setType(flumeSink.getType());
- obj.setChannel(flumeSink.getChannel());
- String setName = flumeSink.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- setObj.getProxySinkMap().put(obj.getName(), obj);
- }
+ private void setReloadTimer() {
+ Timer reloadTimer = new Timer(true);
+ TimerTask task = new RepositoryTimerTask<DataProxyConfigRepository>(this);
+ reloadTimer.scheduleAtFixedRate(task, reloadInterval, reloadInterval);
}
/**
- * reloadFlumeSinkExt
+ * generateClusterJson
*
* @param newClusterSets
*/
- private void reloadFlumeSinkExt(Map<String, DataProxyClusterSet> newClusterSets) {
- for (FlumeSinkExt ext : clusterSetMapper.selectFlumeSinkExt()) {
- String setName = ext.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- ProxySink obj = setObj.getProxySinkMap().get(ext.getParentName());
- if (obj != null) {
- obj.getParams().put(ext.getKeyName(), ext.getKeyValue());
+ @SuppressWarnings("unchecked")
+ private void generateClusterJson(Map<String, ProxyClusterObject> proxyClusterMap,
+ Map<String, Map<String, List<CacheCluster>>> cacheClusterMap) {
+ Map<String, String> newProxyConfigJson = new ConcurrentHashMap<>();
+ Map<String, String> newProxyMd5Map = new ConcurrentHashMap<>();
+ Map<String, Map<String, String>> tagCache = new HashMap<>();
+ for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
+ ProxyClusterObject proxyObj = entry.getValue();
+ // proxy
+ DataProxyCluster clusterObj = new DataProxyCluster();
+ clusterObj.setProxyCluster(proxyObj);
+ // cache
+ String clusterTag = proxyObj.getSetName();
+ String extTag = proxyObj.getZone();
+ Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
+ if (cacheClusterZoneMap != null) {
+ Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
+ for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
+ if (cacheEntry.getValue().size() == 0) {
+ continue;
+ }
+ Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
+ k -> MAP_SPLITTER.split(cacheEntry.getKey()));
+ if (isSubTag(wholeTagMap, subTagMap)) {
+ CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
+ cacheSet.setSetName(clusterTag);
+ List<CacheCluster> cacheClusterList = cacheEntry.getValue();
+ cacheSet.setType(cacheClusterList.get(0).getType());
+ List<CacheClusterObject> cacheClusters = new ArrayList<>(cacheClusterList.size());
+ cacheSet.setCacheClusters(cacheClusters);
+ for (CacheCluster cacheCluster : cacheClusterList) {
+ CacheClusterObject obj = new CacheClusterObject();
+ obj.setName(cacheCluster.getClusterName());
+ obj.setZone(cacheCluster.getExtTag());
+ try {
+ Map<String, String> params = gson.fromJson(cacheCluster.getExtParams(), Map.class);
+ obj.setParams(params);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ cacheClusters.add(obj);
+ }
+ }
+ }
}
+ // json
+ String jsonDataProxyCluster = gson.toJson(clusterObj);
+ String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
+ DataProxyConfigResponse response = new DataProxyConfigResponse();
+ response.setResult(true);
+ response.setErrCode(DataProxyConfigResponse.SUCC);
+ response.setMd5(md5);
+ response.setData(clusterObj);
+ String jsonResponse = gson.toJson(response);
+ newProxyConfigJson.put(proxyObj.getName(), jsonResponse);
+ newProxyMd5Map.put(proxyObj.getName(), md5);
}
- }
- /**
- * reloadInlongId
- *
- * @param newClusterSets
- */
- private void reloadInlongId(Map<String, DataProxyClusterSet> newClusterSets) {
- for (InLongId inlongId : clusterSetMapper.selectInlongId()) {
- InLongIdObject obj = new InLongIdObject();
- obj.setInlongId(inlongId.getInlongId());
- obj.setTopic(inlongId.getTopic());
- if (inlongId.getParams() != null) {
- Map<String, String> params = MAP_SPLITTER.split(inlongId.getParams());
- obj.getParams().putAll(params);
- }
- String setName = inlongId.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- setObj.getInlongIds().add(obj);
- }
+ // replace
+ this.proxyConfigJson = newProxyConfigJson;
+ this.proxyMd5Map = newProxyMd5Map;
}
/**
- * reloadInlongId
- *
- * @param newClusterSets
+ * isSubTag
+ * @param wholeTagMap
+ * @param subTagMap
+ * @return
*/
- private void reloadProxy2Cache(Map<String, DataProxyClusterSet> newClusterSets) {
- for (ProxyClusterToCacheCluster proxy2Cache : clusterSetMapper.selectProxyClusterToCacheCluster()) {
- String proxyClusterName = proxy2Cache.getProxyClusterName();
- String cacheClusterName = proxy2Cache.getCacheClusterName();
- ProxyClusterObject proxyObj = this.proxyClusterMap.get(proxyClusterName);
- if (proxyObj == null) {
- continue;
- }
- CacheClusterObject cacheObj = this.cacheClusterMap.get(cacheClusterName);
- if (cacheObj == null) {
- continue;
+ private boolean isSubTag(Map<String, String> wholeTagMap, Map<String, String> subTagMap) {
+ for (Entry<String, String> entry : subTagMap.entrySet()) {
+ String value = wholeTagMap.get(entry.getKey());
+ if (value == null || !value.equals(entry.getValue())) {
+ return false;
}
- String setName = proxyObj.getSetName();
- DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName);
- setObj.addProxy2Cache(proxyClusterName, cacheClusterName);
}
+ return true;
}
/**
- * generateClusterJson
- *
- * @param newClusterSets
+ * getProxyMd5
+ * @param clusterName
+ * @return
*/
- private void generateClusterJson(Map<String, DataProxyClusterSet> newClusterSets) {
- for (Entry<String, DataProxyClusterSet> entry : newClusterSets.entrySet()) {
- for (ProxyClusterObject proxyObj : entry.getValue().getProxyClusterList()) {
- // proxy
- DataProxyCluster clusterObj = new DataProxyCluster();
- clusterObj.setProxyCluster(proxyObj);
- // cache
- CacheClusterSetObject allCacheCluster = entry.getValue().getCacheClusterSet();
- CacheClusterSetObject proxyCacheClusterSet = clusterObj.getCacheClusterSet();
- proxyCacheClusterSet.setSetName(allCacheCluster.getSetName());
- proxyCacheClusterSet.setType(allCacheCluster.getType());
- proxyCacheClusterSet.setTopics(allCacheCluster.getTopics());
- // cacheCluster
- Set<String> cacheClusterNameSet = entry.getValue().getProxy2Cache().get(proxyObj.getName());
- if (cacheClusterNameSet != null) {
- for (String cacheClusterName : cacheClusterNameSet) {
- CacheClusterObject cacheObj = this.cacheClusterMap.get(cacheClusterName);
- if (cacheObj == null) {
- continue;
- }
- proxyCacheClusterSet.getCacheClusters().add(cacheObj);
- }
- }
- //
- String jsonDataProxyCluster = gson.toJson(clusterObj);
- String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
- DataProxyConfigResponse response = new DataProxyConfigResponse();
- response.setResult(true);
- response.setErrCode(DataProxyConfigResponse.SUCC);
- response.setMd5(md5);
- response.setData(clusterObj);
- String jsonResponse = gson.toJson(response);
- entry.getValue().getProxyConfigJson().put(proxyObj.getName(), jsonResponse);
- entry.getValue().getMd5Map().put(proxyObj.getName(), md5);
- entry.getValue().setDefaultConfigJson(jsonResponse);
- }
- }
+ public String getProxyMd5(String clusterName) {
+ return this.proxyMd5Map.get(clusterName);
}
/**
- * getDataProxyClusterSet
- *
- * @param setName
+ * getProxyConfigJson
+ * @param clusterName
* @return
*/
- public DataProxyClusterSet getDataProxyClusterSet(String setName) {
- return this.clusterSets.get(setName);
+ public String getProxyConfigJson(String clusterName) {
+ return this.proxyConfigJson.get(clusterName);
}
-
}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 52adb2658..72d352272 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -838,209 +838,6 @@ CREATE TABLE IF NOT EXISTS `workflow_task`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow task table';
--- ----------------------------
--- Table structure for cluster_set
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cluster_set`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `cn_name` varchar(256) COMMENT 'Chinese display name',
- `description` varchar(256) COMMENT 'ClusterSet Introduction',
- `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
- `in_charges` varchar(512) COMMENT 'Name of responsible person, separated by commas',
- `followers` varchar(512) COMMENT 'Name of followers, separated by commas',
- `status` int(4) DEFAULT '21' COMMENT 'ClusterSet status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_cluster_set` (`set_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='ClusterSet table';
-
--- ----------------------------
--- Table structure for cluster_set_inlongid
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cluster_set_inlongid`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `set_name` varchar(256) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, filled in by the user, undeleted ones cannot be repeated',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_cluster_set_inlongid` (`set_name`, `inlong_group_id`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='InlongId table';
-
--- ----------------------------
--- Table structure for cache_cluster
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cache_cluster`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_cache_cluster` (`cluster_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='CacheCluster table';
-
--- ----------------------------
--- Table structure for cache_cluster_ext
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cache_cluster_ext`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
- `key_name` varchar(256) NOT NULL COMMENT 'Configuration item name',
- `key_value` text NULL COMMENT 'The value of the configuration item',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`),
- KEY `index_cache_cluster` (`cluster_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='CacheCluster extension table';
-
--- ----------------------------
--- Table structure for cache_topic
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `cache_topic`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `topic_name` varchar(128) NOT NULL COMMENT 'Topic name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `partition_num` int(11) NOT NULL COMMENT 'Partition number',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_cache_topic` (`topic_name`, `set_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='CacheTopic table';
-
--- ----------------------------
--- Table structure for proxy_cluster
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `proxy_cluster`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_proxy_cluster` (`cluster_name`, `set_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='ProxyCluster table';
-
--- ----------------------------
--- Table structure for proxy_cluster_to_cache_cluster
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `proxy_cluster_to_cache_cluster`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `proxy_cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
- `cache_cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_proxy_cluster_to_cache_cluster` (`proxy_cluster_name`, `cache_cluster_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='The relation table of ProxyCluster and CacheCluster';
-
--- ----------------------------
--- Table structure for flume_source
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_source`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `source_name` varchar(128) NOT NULL COMMENT 'FlumeSource name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `type` varchar(128) NOT NULL COMMENT 'FlumeSource classname',
- `channels` varchar(128) NOT NULL COMMENT 'The channels of FlumeSource, separated by space',
- `selector_type` varchar(128) NOT NULL COMMENT 'FlumeSource channel selector classname',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_flume_source` (`source_name`, `set_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSource table';
-
--- ----------------------------
--- Table structure for flume_source_ext
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_source_ext`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `parent_name` varchar(128) NOT NULL COMMENT 'FlumeSource name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `key_name` varchar(256) NOT NULL COMMENT 'Configuration item name',
- `key_value` text NULL COMMENT 'The value of the configuration item',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`),
- KEY `index_flume_source_ext` (`parent_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSource extension table';
-
--- ----------------------------
--- Table structure for flume_channel
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_channel`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `channel_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `type` varchar(128) NOT NULL COMMENT 'FlumeChannel classname',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_flume_channel` (`channel_name`, `set_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeChannel table';
-
--- ----------------------------
--- Table structure for flume_channel_ext
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_channel_ext`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `parent_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `key_name` varchar(256) NOT NULL COMMENT 'Configuration item name',
- `key_value` text NULL COMMENT 'The value of the configuration item',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`),
- KEY `index_flume_channel_ext` (`parent_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeChannel extension table';
-
--- ----------------------------
--- Table structure for flume_sink
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_sink`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `sink_name` varchar(128) NOT NULL COMMENT 'FlumeSink name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `type` varchar(128) NOT NULL COMMENT 'FlumeSink classname',
- `channel` varchar(128) NOT NULL COMMENT 'FlumeSink channel',
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_flume_sink` (`sink_name`, `set_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSink table';
-
--- ----------------------------
--- Table structure for flume_sink_ext
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `flume_sink_ext`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `parent_name` varchar(128) NOT NULL COMMENT 'FlumeSink name, English, numbers and underscore',
- `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
- `key_name` varchar(256) NOT NULL COMMENT 'Configuration item name',
- `key_value` text NULL COMMENT 'The value of the configuration item',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`),
- KEY `index_flume_sink_ext` (`parent_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSink extension table';
-
-- ----------------------------
-- Table structure for db_collector_detail_task
-- ----------------------------
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 5ee5986ff..ef0a93021 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -17,8 +17,6 @@
package org.apache.inlong.manager.web.controller.openapi;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
import org.apache.inlong.manager.common.beans.Response;
@@ -31,8 +29,12 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+
import java.util.List;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+
/**
* Data proxy controller.
*/
@@ -70,9 +72,9 @@ public class DataProxyController {
@GetMapping("/getAllConfig")
@ApiOperation(value = "Get all proxy config")
- public String getAllConfig(@RequestParam("clusterName") String clusterName, @RequestParam("setName") String setName,
+ public String getAllConfig(@RequestParam("clusterName") String clusterName,
@RequestParam(value = "md5", required = false) String md5) {
- return dataProxyClusterService.getAllConfig(clusterName, setName, md5);
+ return dataProxyClusterService.getAllConfig(clusterName, md5);
}
}
diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties
index ed6f02848..731a80d70 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -25,6 +25,7 @@ server.servlet.context-path=/api/inlong/manager
spring.application.name=InLong-Manager-Web
spring.profiles.active=dev
+spring.main.allow-circular-references=true
spring.mvc.pathmatch.matching-strategy=ANT_PATH_MATCHER
# Serialize the Date type to a timestamp
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
index a60f45fd4..d70501063 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
@@ -17,25 +17,25 @@
package org.apache.inlong.sort.standalone.dispatch;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flume.Context;
-import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
-import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.slf4j.Logger;
+import java.util.concurrent.atomic.AtomicLong;
/**
* DispatchManager
*/
public class DispatchManager {
- public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class);
+ public static final Logger LOG = InlongLoggerFactory.getLogger(DispatchManager.class);
public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
@@ -44,13 +44,15 @@ public class DispatchManager {
public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
public static final long MINUTE_MS = 60L * 1000;
- private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
private final long dispatchTimeout;
private final long maxPackCount;
private final long maxPackSize;
+ private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
private ConcurrentHashMap<String, DispatchProfile> profileCache = new ConcurrentHashMap<>();
- //
+ // flag that manager need to output overtime data.
private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
+ private AtomicLong inCounter = new AtomicLong(0);
+ private AtomicLong outCounter = new AtomicLong(0);
/**
* Constructor
@@ -71,10 +73,6 @@ public class DispatchManager {
* @param event
*/
public void addEvent(ProfileEvent event) {
- if (needOutputOvertimeData.get()) {
- this.outputOvertimeData();
- this.needOutputOvertimeData.set(false);
- }
// parse
String eventUid = event.getUid();
long dispatchTime = event.getRawLogTime() - event.getRawLogTime() % MINUTE_MS;
@@ -93,8 +91,10 @@ public class DispatchManager {
event.getInlongStreamId(), dispatchTime);
DispatchProfile oldDispatchProfile = this.profileCache.put(dispatchKey, newDispatchProfile);
this.dispatchQueue.offer(oldDispatchProfile);
+ outCounter.addAndGet(dispatchProfile.getCount());
newDispatchProfile.addEvent(event, maxPackCount, maxPackSize);
}
+ inCounter.incrementAndGet();
}
/**
@@ -103,6 +103,9 @@ public class DispatchManager {
* @return
*/
public void outputOvertimeData() {
+ if (!needOutputOvertimeData.getAndSet(false)) {
+ return;
+ }
LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
profileCache.size(), dispatchQueue.size());
long currentTime = System.currentTimeMillis();
@@ -119,10 +122,16 @@ public class DispatchManager {
}
// output
removeKeys.forEach((key) -> {
- dispatchQueue.offer(this.profileCache.remove(key));
+ DispatchProfile dispatchProfile = this.profileCache.remove(key);
+ if (dispatchProfile != null) {
+ dispatchQueue.offer(dispatchProfile);
+ outCounter.addAndGet(dispatchProfile.getCount());
+ }
});
- LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}",
- profileCache.size(), dispatchQueue.size(), eventCount);
+ LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+ + "inCounter:{},outCounter:{}",
+ profileCache.size(), dispatchQueue.size(), eventCount,
+ inCounter.getAndSet(0), outCounter.getAndSet(0));
}
/**
@@ -156,6 +165,6 @@ public class DispatchManager {
* setNeedOutputOvertimeData
*/
public void setNeedOutputOvertimeData() {
- this.needOutputOvertimeData.set(true);
+ this.needOutputOvertimeData.getAndSet(true);
}
}