You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "healchow (via GitHub)" <gi...@apache.org> on 2023/03/01 07:05:16 UTC

[GitHub] [inlong] healchow commented on a diff in pull request #7474: [INLONG-7473][Manager] Fix StreamSource in the TO_BE_ISSUED_DELETE state cannot be issued properly

healchow commented on code in PR #7474:
URL: https://github.com/apache/inlong/pull/7474#discussion_r1121243136


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java:
##########
@@ -471,70 +471,68 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
         dataConfig.setInlongStreamId(streamId);
 
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
-        if (groupEntity == null) {
-            throw new BusinessException(String.format("inlong group not found for groupId=%s", groupId));
-        }
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
-        if (streamEntity == null) {
-            throw new BusinessException(
-                    String.format("inlong stream not found for groupId=%s streamId=%s", groupId, streamId));
-        }
-
-        String extParams = entity.getExtParams();
-        dataConfig.setSyncSend(streamEntity.getSyncSend());
-        if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
-            String dataSeparator = streamEntity.getDataSeparator();
-            extParams = (null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams);
-        }
-        dataConfig.setExtParams(extParams);
-
-        int dataReportType = groupEntity.getDataReportType();
-        dataConfig.setDataReportType(dataReportType);
-        if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
-            // add mq cluster setting
-            List<MQClusterInfo> mqSet = new ArrayList<>();
-            List<String> clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag());
-            List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR);
-            ClusterPageRequest pageRequest = ClusterPageRequest.builder()
-                    .typeList(typeList)
-                    .clusterTagList(clusterTagList)
-                    .build();
-            List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(pageRequest);
-            for (InlongClusterEntity cluster : mqClusterList) {
-                MQClusterInfo clusterInfo = new MQClusterInfo();
-                clusterInfo.setUrl(cluster.getUrl());
-                clusterInfo.setToken(cluster.getToken());
-                clusterInfo.setMqType(cluster.getType());
-                clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(), HashMap.class));
-                mqSet.add(clusterInfo);
+        if (groupEntity != null && streamEntity != null) {
+            String extParams = entity.getExtParams();
+            dataConfig.setSyncSend(streamEntity.getSyncSend());
+            if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
+                String dataSeparator = streamEntity.getDataSeparator();
+                extParams = (null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams);
             }
-            dataConfig.setMqClusters(mqSet);
-
-            // add topic setting
-            String mqResource = groupEntity.getMqResource();
-            String mqType = groupEntity.getMqType();
-            if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-                // first get the tenant from the InlongGroup, and then get it from the PulsarCluster.
-                InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
-                String tenant = pulsarDTO.getTenant();
-                if (StringUtils.isBlank(tenant)) {
-                    // If there are multiple Pulsar clusters, take the first one.
-                    // Note that the tenants in multiple Pulsar clusters must be identical.
-                    PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(mqClusterList.get(0).getExtParams());
-                    tenant = pulsarCluster.getTenant();
+            dataConfig.setExtParams(extParams);
+
+            int dataReportType = groupEntity.getDataReportType();
+            dataConfig.setDataReportType(dataReportType);
+            if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
+                // add mq cluster setting
+                List<MQClusterInfo> mqSet = new ArrayList<>();
+                List<String> clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag());
+                List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR);
+                ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+                        .typeList(typeList)
+                        .clusterTagList(clusterTagList)
+                        .build();
+                List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(pageRequest);
+                for (InlongClusterEntity cluster : mqClusterList) {
+                    MQClusterInfo clusterInfo = new MQClusterInfo();
+                    clusterInfo.setUrl(cluster.getUrl());
+                    clusterInfo.setToken(cluster.getToken());
+                    clusterInfo.setMqType(cluster.getType());
+                    clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(), HashMap.class));
+                    mqSet.add(clusterInfo);
                 }
-
-                String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
-                        tenant, mqResource, streamEntity.getMqResource());
-                DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
-                topicConfig.setInlongGroupId(groupId + "/" + streamId);
-                topicConfig.setTopic(topic);
-                dataConfig.setTopicInfo(topicConfig);
-            } else if (MQType.TUBEMQ.equals(mqType)) {
-                DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
-                topicConfig.setInlongGroupId(groupId);
-                topicConfig.setTopic(mqResource);
-                dataConfig.setTopicInfo(topicConfig);
+                dataConfig.setMqClusters(mqSet);
+
+                // add topic setting
+                String mqResource = groupEntity.getMqResource();
+                String mqType = groupEntity.getMqType();
+                if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
+                    // first get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+                    InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+                    String tenant = pulsarDTO.getTenant();
+                    if (StringUtils.isBlank(tenant)) {
+                        // If there are multiple Pulsar clusters, take the first one.
+                        // Note that the tenants in multiple Pulsar clusters must be identical.
+                        PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(
+                                mqClusterList.get(0).getExtParams());
+                        tenant = pulsarCluster.getTenant();
+                    }
+
+                    String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
+                            tenant, mqResource, streamEntity.getMqResource());
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId + "/" + streamId);
+                    topicConfig.setTopic(topic);
+                    dataConfig.setTopicInfo(topicConfig);
+                } else if (MQType.TUBEMQ.equals(mqType)) {
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId);
+                    topicConfig.setTopic(mqResource);
+                    dataConfig.setTopicInfo(topicConfig);
+                }
+            } else {
+                dataConfig.setSyncSend(0);

Review Comment:
   Why set the syncSend=0 when inlong_group or inlong_stream was null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org