You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/10 02:57:03 UTC

[inlong] branch branch-1.4 updated (287644e7c -> 4634918b5)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 287644e7c [INLONG-6406][DataProxy] Should support creating sink dynamically after started (addendum) (#6488)
     new f31a9d6da [INLONG-6491][Manager] Support getting backup info in getAllConfig (#6492)
     new 41d285463 [INLONG-6481][Dashboard] Supports management of SQLServer source (#6490)
     new 4634918b5 [INLONG-6484][Sort] Bugfix: fix schema update Circular dependency error in multiple sink iceberg scenes (#6486)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 inlong-dashboard/src/locales/cn.json               |  10 +
 inlong-dashboard/src/locales/en.json               |  10 +
 .../sources/defaults/{Oracle.ts => SQLServer.ts}   |  72 ++---
 .../src/metas/sources/defaults/index.ts            |   5 +
 .../repository/DataProxyConfigRepository.java      | 340 +++++++++++----------
 .../sink/multiple/DynamicSchemaHandleOperator.java |   1 +
 6 files changed, 228 insertions(+), 210 deletions(-)
 copy inlong-dashboard/src/metas/sources/defaults/{Oracle.ts => SQLServer.ts} (81%)


[inlong] 02/03: [INLONG-6481][Dashboard] Supports management of SQLServer source (#6490)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 41d2854632f0c4b9fc236a41327bdaa363163e58
Author: Lizhen <88...@users.noreply.github.com>
AuthorDate: Thu Nov 10 10:55:18 2022 +0800

    [INLONG-6481][Dashboard] Supports management of SQLServer source (#6490)
---
 inlong-dashboard/src/locales/cn.json               |  10 ++
 inlong-dashboard/src/locales/en.json               |  10 ++
 .../src/metas/sources/defaults/SQLServer.ts        | 151 +++++++++++++++++++++
 .../src/metas/sources/defaults/index.ts            |   5 +
 4 files changed, 176 insertions(+)

diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index 44ad7e605..029da9daf 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -58,6 +58,16 @@
   "meta.Sources.PostgreSQL.TableName": "表格名称",
   "meta.Sources.PostgreSQL.decodingPluginName": "解码插件名称",
   "meta.Sources.PostgreSQL.PrimaryKey": "主键",
+  "meta.Sources.SQLServer.Hostname": "服务器主机",
+  "meta.Sources.SQLServer.Port": "端口",
+  "meta.Sources.SQLServer.Username": "用户",
+  "meta.Sources.SQLServer.Password": "密码",
+  "meta.Sources.SQLServer.Database": "数据库名",
+  "meta.Sources.SQLServer.SchemaName": "架构名称",
+  "meta.Sources.SQLServer.TableName": "表格名称",
+  "meta.Sources.SQLServer.AllMigration": "是否整库迁移",
+  "meta.Sources.SQLServer.ServerTimezone": "服务器时区",
+  "meta.Sources.SQLServer.PrimaryKey": "主键",
   "meta.Sinks.SinkName": "名称",
   "meta.Sinks.SinkNameRule": "以英文字母开头,只能包含英文字母、数字、中划线、下划线",
   "meta.Sinks.SinkType": "类型",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index ecdfcfa24..70e2e536e 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -58,6 +58,16 @@
   "meta.Sources.PostgreSQL.TableName": "TableName",
   "meta.Sources.PostgreSQL.decodingPluginName": "Decoding Plugin Name",
   "meta.Sources.PostgreSQL.PrimaryKey": "PrimaryKey",
+  "meta.Sources.SQLServer.Hostname": "Hostname",
+  "meta.Sources.SQLServer.Port": "Port",
+  "meta.Sources.SQLServer.Username": "Username",
+  "meta.Sources.SQLServer.Password": "Password",
+  "meta.Sources.SQLServer.Database": "Database",
+  "meta.Sources.SQLServer.SchemaName": "SchemaName",
+  "meta.Sources.SQLServer.TableName": "TableName",
+  "meta.Sources.SQLServer.AllMigration": "AllMigration",
+  "meta.Sources.SQLServer.ServerTimezone": "Timezone",
+  "meta.Sources.SQLServer.PrimaryKey": "PrimaryKey",
   "meta.Sinks.SinkName": "Name",
   "meta.Sinks.SinkNameRule": "At the beginning of English letters, only English letters, numbers, minus, and underscores",
   "meta.Sinks.SinkType": "Type",
diff --git a/inlong-dashboard/src/metas/sources/defaults/SQLServer.ts b/inlong-dashboard/src/metas/sources/defaults/SQLServer.ts
new file mode 100644
index 000000000..0bd6e40a8
--- /dev/null
+++ b/inlong-dashboard/src/metas/sources/defaults/SQLServer.ts
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { DataWithBackend } from '@/metas/DataWithBackend';
+import { RenderRow } from '@/metas/RenderRow';
+import { RenderList } from '@/metas/RenderList';
+import { SourceInfo } from '../common/SourceInfo';
+import i18n from '@/i18n';
+
+const { I18n } = DataWithBackend;
+const { FieldDecorator } = RenderRow;
+const { ColumnDecorator } = RenderList;
+
+export default class SQLServerSource
+  extends SourceInfo
+  implements DataWithBackend, RenderRow, RenderList
+{
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.SQLServer.Hostname')
+  hostname: string;
+
+  @FieldDecorator({
+    type: 'inputnumber',
+    rules: [{ required: true }],
+    initialValue: 1433,
+    props: values => ({
+      min: 1,
+      max: 65535,
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.SQLServer.Port')
+  port: number;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.SQLServer.Username')
+  username: string;
+
+  @FieldDecorator({
+    type: 'password',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.SQLServer.Password')
+  password: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.SQLServer.Database')
+  database: string;
+
+  @FieldDecorator({
+    type: 'radio',
+    rules: [{ required: true }],
+    initialValue: false,
+    props: values => ({
+      disabled: values?.status === 101,
+      options: [
+        {
+          label: i18n.t('basic.Yes'),
+          value: true,
+        },
+        {
+          label: i18n.t('basic.No'),
+          value: false,
+        },
+      ],
+    }),
+  })
+  @I18n('meta.Sources.SQLServer.AllMigration')
+  allMigration: boolean;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    initialValue: 'UTC',
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.SQLServer.ServerTimezone')
+  serverTimezone: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.SQLServer.SchemaName')
+  schemaName: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.SQLServer.TableName')
+  tableName: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.SQLServer.PrimaryKey')
+  primaryKey: string;
+}
diff --git a/inlong-dashboard/src/metas/sources/defaults/index.ts b/inlong-dashboard/src/metas/sources/defaults/index.ts
index 15e30fcc7..f5f0d1d16 100644
--- a/inlong-dashboard/src/metas/sources/defaults/index.ts
+++ b/inlong-dashboard/src/metas/sources/defaults/index.ts
@@ -56,4 +56,9 @@ export const allDefaultSources: MetaExportWithBackendList<SourceMetaType> = [
     value: 'POSTGRESQL',
     LoadEntity: () => import('./PostgreSQL'),
   },
+  {
+    label: 'SQLServer',
+    value: 'SQLSERVER',
+    LoadEntity: () => import('./SQLServer'),
+  },
 ];


[inlong] 01/03: [INLONG-6491][Manager] Support getting backup info in getAllConfig (#6492)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f31a9d6da85f0251005115c610c698d35d93fd71
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Nov 10 10:00:09 2022 +0800

    [INLONG-6491][Manager] Support getting backup info in getAllConfig (#6492)
---
 .../repository/DataProxyConfigRepository.java      | 340 +++++++++++----------
 1 file changed, 171 insertions(+), 169 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index d49eb9a34..55a5e7c85 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -19,10 +19,13 @@ package org.apache.inlong.manager.service.repository;
 
 import com.google.common.base.Splitter;
 import com.google.gson.Gson;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.constant.ClusterSwitch;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
@@ -34,20 +37,23 @@ import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
-import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
-import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
-import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
-import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
-import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
+import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
+import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
+import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
+import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
+import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
+import org.apache.inlong.manager.service.core.SortConfigLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,7 +61,6 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Repository;
 import org.springframework.transaction.annotation.Transactional;
 
-import javax.annotation.PostConstruct;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Date;
@@ -63,10 +68,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.annotation.PostConstruct;
+
 /**
  * DataProxyConfigRepository
  */
@@ -76,6 +84,7 @@ public class DataProxyConfigRepository implements IRepository {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
 
+    public static final String KEY_NAMESPACE = "namespace";
     public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
     public static final String KEY_BACKUP_TOPIC = "backup_topic";
     public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
@@ -104,6 +113,8 @@ public class DataProxyConfigRepository implements IRepository {
     private InlongGroupEntityMapper inlongGroupMapper;
     @Autowired
     private StreamSinkEntityMapper streamSinkMapper;
+    @Autowired
+    private SortConfigLoader sortConfigLoader;
 
     @PostConstruct
     public void initialize() {
@@ -195,47 +206,44 @@ public class DataProxyConfigRepository implements IRepository {
     @Override
     @Transactional(rollbackFor = Exception.class)
     public void reload() {
-        LOGGER.info("start to reload config.");
-        Map<String, ProxyClusterObject> proxyClusterMap = this.reloadProxyCluster();
+        LOGGER.info("start to reload config:" + this.getClass().getSimpleName());
+        // reload proxy cluster
+        Map<String, DataProxyCluster> proxyClusterMap = new HashMap<>();
+        this.reloadProxyCluster(proxyClusterMap);
         if (proxyClusterMap.size() == 0) {
             return;
         }
-        Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = this.reloadCacheCluster();
-        Map<String, List<InLongIdObject>> inlongIdMap = this.reloadInlongId();
-        // mapping inlongIdMap
-        for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
-            String clusterTag = entry.getValue().getSetName();
-            List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
-            if (inlongIds != null) {
-                entry.getValue().setInlongIds(inlongIds);
-            }
-        }
+        // reoload cache cluster
+        this.reloadCacheCluster(proxyClusterMap);
+        // reload inlong group id and inlong stream id
+        this.reloadInlongId(proxyClusterMap);
 
         // generateClusterJson
-        this.generateClusterJson(proxyClusterMap, cacheClusterMap);
+        this.generateClusterJson(proxyClusterMap);
 
-        LOGGER.info("end to reload config.");
+        LOGGER.info("end to reload config:" + this.getClass().getSimpleName());
     }
 
     /**
      * reloadProxyCluster
      */
-    private Map<String, ProxyClusterObject> reloadProxyCluster() {
-        Map<String, ProxyClusterObject> proxyClusterMap = new HashMap<>();
+    private void reloadProxyCluster(Map<String, DataProxyCluster> proxyClusterMap) {
         for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) {
             ProxyClusterObject obj = new ProxyClusterObject();
             obj.setName(proxyCluster.getClusterName());
             obj.setSetName(proxyCluster.getClusterTag());
             obj.setZone(proxyCluster.getExtTag());
-            proxyClusterMap.put(obj.getName(), obj);
+            DataProxyCluster clusterObj = new DataProxyCluster();
+            clusterObj.setProxyCluster(obj);
+            proxyClusterMap.put(obj.getName(), clusterObj);
         }
-        return proxyClusterMap;
     }
 
     /**
      * reloadCacheCluster
      */
-    private Map<String, Map<String, List<CacheCluster>>> reloadCacheCluster() {
+    private void reloadCacheCluster(Map<String, DataProxyCluster> proxyClusterMap) {
+        // reload cache cluster
         Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<>();
         for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
             if (StringUtils.isEmpty(cacheCluster.getExtTag())) {
@@ -248,126 +256,160 @@ public class DataProxyConfigRepository implements IRepository {
                         .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
             }
         }
-        return cacheClusterMap;
+        // mark cache cluster to proxy cluster
+        Map<String, Map<String, String>> tagCache = new HashMap<>();
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            DataProxyCluster clusterObj = entry.getValue();
+            ProxyClusterObject proxyObj = clusterObj.getProxyCluster();
+            // cache
+            String clusterTag = proxyObj.getSetName();
+            String extTag = proxyObj.getZone();
+            Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
+            if (cacheClusterZoneMap != null) {
+                Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
+                for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
+                    if (cacheEntry.getValue().size() == 0) {
+                        continue;
+                    }
+                    Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
+                            k -> MAP_SPLITTER.split(cacheEntry.getKey()));
+                    if (isSubTag(wholeTagMap, subTagMap)) {
+                        CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
+                        cacheSet.setSetName(clusterTag);
+                        List<CacheCluster> cacheClusterList = cacheEntry.getValue();
+                        cacheSet.setType(cacheClusterList.get(0).getType());
+                        List<CacheClusterObject> cacheClusters = cacheSet.getCacheClusters();
+                        for (CacheCluster cacheCluster : cacheClusterList) {
+                            CacheClusterObject obj = new CacheClusterObject();
+                            obj.setName(cacheCluster.getClusterName());
+                            obj.setZone(cacheCluster.getExtTag());
+                            obj.setParams(fromJson(cacheCluster.getExtParams()));
+                            cacheClusters.add(obj);
+                        }
+                    }
+                }
+            }
+        }
     }
 
     /**
-     * reloadInlongId
+     * fromJson
      */
-    private Map<String, List<InLongIdObject>> reloadInlongId() {
-        // parse group
-        Map<String, InlongGroupId> groupIdMap = new HashMap<>();
-        clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
-        // parse stream
-        Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
-        for (InlongStreamId streamIdObj : clusterSetMapper.selectInlongStreamId()) {
-            String groupId = streamIdObj.getInlongGroupId();
-            InlongGroupId groupIdObj = groupIdMap.get(groupId);
-            if (groupId == null) {
-                continue;
+    private Map<String, String> fromJson(String jsonString) {
+        Map<String, String> mapObj = new HashMap<>();
+        try {
+            JsonObject obj = gson.fromJson(jsonString, JsonObject.class);
+            for (String key : obj.keySet()) {
+                JsonElement child = obj.get(key);
+                if (child.isJsonPrimitive()) {
+                    mapObj.put(key, child.getAsString());
+                } else {
+                    mapObj.put(key, child.toString());
+                }
             }
-            Map<String, String> groupParams = this.getExtParams(groupIdObj.getExtParams());
-            Map<String, String> streamParams = this.getExtParams(streamIdObj.getExtParams());
-            this.parseMasterTopic(groupIdObj, streamIdObj, groupParams, streamParams, inlongIdMap);
-            this.parseBackupTopic(groupIdObj, streamIdObj, groupParams, streamParams, inlongIdMap);
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
         }
-        return inlongIdMap;
+        return mapObj;
     }
 
     /**
-     * getExtParams
+     * reloadInlongId
      */
-    @SuppressWarnings("unchecked")
-    private Map<String, String> getExtParams(String extParams) {
-        // parse extparams
-        if (!StringUtils.isEmpty(extParams)) {
-            try {
-                Map<String, String> groupParams = gson.fromJson(extParams, HashMap.class);
-                return groupParams;
-            } catch (Exception e) {
-                LOGGER.error("Fail to parse ext error:{},params:{}", e.getMessage(), extParams, e);
+    private void reloadInlongId(Map<String, DataProxyCluster> proxyClusterMap) {
+        // reload inlong group id
+        Map<String, InlongGroupId> groupIdMap = new HashMap<>();
+        clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
+        // reload inlong group ext params
+        Map<String, Map<String, String>> groupParams = new HashMap<>();
+        groupIdMap.forEach((k, v) -> groupParams.put(k, fromJson(v.getExtParams())));
+        // reload inlong group ext
+        List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
+                .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
+        groupExtCursor.forEach(v -> groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
+                .put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
+        // reload inlong stream id
+        Map<String, InlongStreamId> streamIdMap = new HashMap<>();
+        clusterSetMapper.selectInlongStreamId()
+                .forEach(v -> streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
+        // reload inlong stream ext params
+        Map<String, Map<String, String>> streamParams = new HashMap<>();
+        streamIdMap.forEach((k, v) -> streamParams.put(k, fromJson(v.getExtParams())));
+        // reload inlong stream ext
+        List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
+                .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
+        streamExtCursor.forEach(v -> streamParams
+                .computeIfAbsent(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), k -> new HashMap<>())
+                .put(ClusterSwitch.BACKUP_MQ_RESOURCE, v.getKeyValue()));
+
+        // build Map<clusterTag, List<InlongIdObject>>
+        Map<String, List<InLongIdObject>> inlongIdMap = this.parseInlongId(groupIdMap, groupParams, streamIdMap,
+                streamParams);
+        // mark inlong id to proxy cluster
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            String clusterTag = entry.getValue().getProxyCluster().getSetName();
+            List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
+            if (inlongIds != null) {
+                entry.getValue().getProxyCluster().getInlongIds().addAll(inlongIds);
             }
         }
-        return new HashMap<>();
     }
 
     /**
-     * parseMasterTopic
+     * parseInlongId
      */
-    private void parseMasterTopic(InlongGroupId groupIdObj, InlongStreamId streamIdObj,
-            Map<String, String> groupParams, Map<String, String> streamParams,
-            Map<String, List<InLongIdObject>> inlongIdMap) {
-        // choose topic
-        String groupTopic = groupIdObj.getTopic();
-        String streamTopic = streamIdObj.getTopic();
-        String finalTopic = null;
-        if (StringUtils.isEmpty(groupTopic)) {
-            // both empty then ignore
-            if (StringUtils.isEmpty(streamTopic)) {
-                return;
-            } else {
-                finalTopic = streamTopic;
+    private Map<String, List<InLongIdObject>> parseInlongId(Map<String, InlongGroupId> groupIdMap,
+            Map<String, Map<String, String>> groupParams, Map<String, InlongStreamId> streamIdMap,
+            Map<String, Map<String, String>> streamParams) {
+        Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
+        for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
+            InlongStreamId streamIdObj = entry.getValue();
+            String groupId = streamIdObj.getInlongGroupId();
+            InlongGroupId groupIdObj = groupIdMap.get(groupId);
+            if (groupId == null) {
+                continue;
             }
-        } else {
-            if (StringUtils.isEmpty(streamTopic)) {
-                finalTopic = groupTopic;
+            // master
+            InLongIdObject obj = new InLongIdObject();
+            String inlongId = entry.getKey();
+            obj.setInlongId(inlongId);
+            Optional.ofNullable(groupParams.get(groupId)).ifPresent(v -> obj.getParams().putAll(v));
+            Optional.ofNullable(streamParams.get(inlongId)).ifPresent(v -> obj.getParams().putAll(v));
+            if (StringUtils.isBlank(streamIdObj.getTopic())) {
+                obj.setTopic(groupIdObj.getTopic());
             } else {
-                // Pulsar: namespace+topic
-                finalTopic = groupTopic + "/" + streamTopic;
+                obj.setTopic(streamIdObj.getTopic());
+                obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
+            }
+            inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
+            // backup
+            InLongIdObject backupObj = new InLongIdObject();
+            backupObj.setInlongId(inlongId);
+            backupObj.getParams().putAll(obj.getParams());
+            Map<String, String> groupParam = groupParams.get(groupId);
+            if (groupParam != null && groupParam.containsKey(ClusterSwitch.BACKUP_CLUSTER_TAG)
+                    && groupParam.containsKey(ClusterSwitch.BACKUP_MQ_RESOURCE)) {
+                String clusterTag = groupParam.get(ClusterSwitch.BACKUP_CLUSTER_TAG);
+                String groupMqResource = groupParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE);
+
+                Map<String, String> streamParam = streamParams.get(inlongId);
+                if (streamParam != null && !StringUtils.isBlank(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE))) {
+                    obj.setTopic(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE));
+                    backupObj.getParams().put(KEY_NAMESPACE, groupMqResource);
+                } else {
+                    obj.setTopic(groupMqResource);
+                }
+                inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(obj);
             }
         }
-        // concat id
-        InLongIdObject obj = new InLongIdObject();
-        String inlongId = streamIdObj.getInlongGroupId() + "." + streamIdObj.getInlongStreamId();
-        obj.setInlongId(inlongId);
-        obj.setTopic(finalTopic);
-        Map<String, String> params = new HashMap<>();
-        params.putAll(groupParams);
-        params.putAll(streamParams);
-        obj.setParams(params);
-        inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
+        return inlongIdMap;
     }
 
     /**
-     * parseBackupTopic
+     * getInlongId
      */
-    private void parseBackupTopic(InlongGroupId groupIdObj, InlongStreamId streamIdObj,
-            Map<String, String> groupParams, Map<String, String> streamParams,
-            Map<String, List<InLongIdObject>> inlongIdMap) {
-        Map<String, String> params = new HashMap<>();
-        params.putAll(groupParams);
-        params.putAll(streamParams);
-        // find backup cluster tag
-        String clusterTag = params.get(KEY_BACKUP_CLUSTER_TAG);
-        if (StringUtils.isEmpty(clusterTag)) {
-            return;
-        }
-        // find backup topic
-        String groupTopic = groupParams.get(KEY_BACKUP_TOPIC);
-        String streamTopic = streamParams.get(KEY_BACKUP_TOPIC);
-        String finalTopic = null;
-        if (StringUtils.isEmpty(groupTopic)) {
-            // both empty then ignore
-            if (StringUtils.isEmpty(streamTopic)) {
-                return;
-            } else {
-                finalTopic = streamTopic;
-            }
-        } else {
-            if (StringUtils.isEmpty(streamTopic)) {
-                finalTopic = groupTopic;
-            } else {
-                // Pulsar: namespace+topic
-                finalTopic = groupTopic + "/" + streamTopic;
-            }
-        }
-        // concat id
-        InLongIdObject obj = new InLongIdObject();
-        String inlongId = streamIdObj.getInlongGroupId() + "." + streamIdObj.getInlongStreamId();
-        obj.setInlongId(inlongId);
-        obj.setTopic(finalTopic);
-        obj.setParams(params);
-        inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(obj);
+    private String getInlongId(String inlongGroupId, String inlongStreamId) {
+        return inlongGroupId + "." + inlongStreamId;
     }
 
     /**
@@ -382,62 +424,22 @@ public class DataProxyConfigRepository implements IRepository {
     /**
      * generateClusterJson
      */
-    @SuppressWarnings("unchecked")
-    private void generateClusterJson(Map<String, ProxyClusterObject> proxyClusterMap,
-            Map<String, Map<String, List<CacheCluster>>> cacheClusterMap) {
+    private void generateClusterJson(Map<String, DataProxyCluster> proxyClusterMap) {
         Map<String, String> newProxyConfigJson = new ConcurrentHashMap<>();
         Map<String, String> newProxyMd5Map = new ConcurrentHashMap<>();
-        Map<String, Map<String, String>> tagCache = new HashMap<>();
-        for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
-            ProxyClusterObject proxyObj = entry.getValue();
-            // proxy
-            DataProxyCluster clusterObj = new DataProxyCluster();
-            clusterObj.setProxyCluster(proxyObj);
-            // cache
-            String clusterTag = proxyObj.getSetName();
-            String extTag = proxyObj.getZone();
-            Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
-            if (cacheClusterZoneMap != null) {
-                Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
-                for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
-                    if (cacheEntry.getValue().size() == 0) {
-                        continue;
-                    }
-                    Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
-                            k -> MAP_SPLITTER.split(cacheEntry.getKey()));
-                    if (isSubTag(wholeTagMap, subTagMap)) {
-                        CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
-                        cacheSet.setSetName(clusterTag);
-                        List<CacheCluster> cacheClusterList = cacheEntry.getValue();
-                        cacheSet.setType(cacheClusterList.get(0).getType());
-                        List<CacheClusterObject> cacheClusters = new ArrayList<>(cacheClusterList.size());
-                        cacheSet.setCacheClusters(cacheClusters);
-                        for (CacheCluster cacheCluster : cacheClusterList) {
-                            CacheClusterObject obj = new CacheClusterObject();
-                            obj.setName(cacheCluster.getClusterName());
-                            obj.setZone(cacheCluster.getExtTag());
-                            try {
-                                Map<String, String> params = gson.fromJson(cacheCluster.getExtParams(), Map.class);
-                                obj.setParams(params);
-                            } catch (Exception e) {
-                                LOGGER.error(e.getMessage(), e);
-                            }
-                            cacheClusters.add(obj);
-                        }
-                    }
-                }
-            }
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            DataProxyCluster proxyObj = entry.getValue();
             // json
-            String jsonDataProxyCluster = gson.toJson(clusterObj);
+            String jsonDataProxyCluster = gson.toJson(proxyObj);
             String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
             DataProxyConfigResponse response = new DataProxyConfigResponse();
             response.setResult(true);
             response.setErrCode(DataProxyConfigResponse.SUCC);
             response.setMd5(md5);
-            response.setData(clusterObj);
+            response.setData(proxyObj);
             String jsonResponse = gson.toJson(response);
-            newProxyConfigJson.put(proxyObj.getName(), jsonResponse);
-            newProxyMd5Map.put(proxyObj.getName(), md5);
+            newProxyConfigJson.put(entry.getKey(), jsonResponse);
+            newProxyMd5Map.put(entry.getKey(), md5);
         }
 
         // replace


[inlong] 03/03: [INLONG-6484][Sort] Bugfix: fix schema update Circular dependency error in multiple sink iceberg scenes (#6486)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 4634918b513ea84db6d06fa15d8b7200a521fd4b
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Thu Nov 10 10:55:39 2022 +0800

    [INLONG-6484][Sort] Bugfix: fix schema update Circular dependency error in multiple sink iceberg scenes (#6486)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 378f040d9..ec65fbae7 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -186,6 +186,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                 output.collect(new StreamRecord<>(recordWithSchema));
             } else {
                 handldAlterSchemaEventFromOperator(tableId, latestSchema, dataSchema);
+                break;
             }
         }
     }