You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/24 07:59:53 UTC
[incubator-inlong] branch master updated: [INLONG-4299][Manager] Add InlongStreamExtensionInfo (#4330)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 77b08eebb [INLONG-4299][Manager] Add InlongStreamExtensionInfo (#4330)
77b08eebb is described below
commit 77b08eebb607d4243b7fde5bb46d02ef81efea06
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Tue May 24 15:59:48 2022 +0800
[INLONG-4299][Manager] Add InlongStreamExtensionInfo (#4330)
* Add InlongStreamExtensionInfo
Co-authored-by: healchow <he...@gmail.com>
---
.../common/pojo/stream/InlongStreamExtInfo.java} | 41 ++---
.../common/pojo/stream/InlongStreamInfo.java | 3 +
.../common/pojo/stream/InlongStreamRequest.java | 2 +
.../common/pojo/stream/InlongStreamResponse.java | 3 +
.../manager/dao/entity/InlongStreamExtEntity.java} | 36 ++--
.../dao/mapper/InlongStreamExtEntityMapper.java | 72 ++++++++
.../src/main/resources/generatorConfig.xml | 2 +-
.../mappers/InlongStreamExtEntityMapper.xml | 203 +++++++++++++++++++++
.../manager/plugin/FlinkSortProcessPlugin.java | 12 ++
.../plugin/eventselect/DeleteProcessSelector.java | 4 +-
...cessSelector.java => DeleteStreamSelector.java} | 20 +-
.../plugin/eventselect/RestartProcessSelector.java | 8 +-
...essSelector.java => RestartStreamSelector.java} | 20 +-
.../plugin/eventselect/StartupProcessSelector.java | 10 +-
...essSelector.java => StartupStreamSelector.java} | 22 +--
.../plugin/eventselect/SuspendProcessSelector.java | 4 +-
...essSelector.java => SuspendStreamSelector.java} | 22 +--
.../inlong/manager/plugin/flink/FlinkService.java | 16 +-
.../inlong/manager/plugin/flink/dto/FlinkInfo.java | 2 -
...SortListener.java => DeleteStreamListener.java} | 75 +++-----
.../plugin/listener/RestartSortListener.java | 20 +-
...ortListener.java => RestartStreamListener.java} | 65 +++----
.../plugin/listener/StartupSortListener.java | 32 +---
...ortListener.java => StartupStreamListener.java} | 84 +++------
...ortListener.java => SuspendStreamListener.java} | 75 +++-----
.../service/core/impl/InlongStreamServiceImpl.java | 87 ++++++---
.../service/group/InlongGroupServiceImpl.java | 11 +-
.../service/sort/CreateSortConfigListenerV2.java | 9 +-
.../sort/CreateStreamSortConfigListener.java | 108 ++++++++---
.../main/resources/sql/apache_inlong_manager.sql | 17 ++
.../manager-web/sql/apache_inlong_manager.sql | 18 ++
31 files changed, 702 insertions(+), 401 deletions(-)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamExtInfo.java
similarity index 55%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamExtInfo.java
index b3af2b3d0..fa3893638 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamExtInfo.java
@@ -15,40 +15,31 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.plugin.flink.dto;
+package org.apache.inlong.manager.common.pojo.stream;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-
-import java.util.List;
/**
- * Flink infomation, including end point, job name, source type, etc.
+ * Inlong stream extension information
*/
@Data
-public class FlinkInfo {
-
- private String endpoint;
-
- private String jobName;
-
- private List<InlongStreamInfo> inlongStreamInfoList;
-
- private String localJarPath;
-
- private String localConfPath;
-
- private String sourceType;
-
- private String sinkType;
-
- private String jobId;
+@ApiModel("Inlong stream extension information")
+public class InlongStreamExtInfo {
- private String savepointPath;
+ @ApiModelProperty(value = "id")
+ private Integer id;
- private boolean isException = false;
+ @ApiModelProperty(value = "inlong group id", required = true)
+ private String inlongGroupId;
- private String exceptionMsg;
+ @ApiModelProperty(value = "inlong stream id", required = true)
+ private String inlongStreamId;
+ @ApiModelProperty(value = "property name")
+ private String keyName;
+ @ApiModelProperty(value = "property value")
+ private String keyValue;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
index 380b4c246..9826f8d52 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
@@ -109,6 +109,9 @@ public class InlongStreamInfo {
@ApiModelProperty(value = "Field list")
private List<InlongStreamFieldInfo> fieldList;
+ @ApiModelProperty(value = "Inlong stream Extension properties")
+ private List<InlongStreamExtInfo> extList;
+
public InlongStreamResponse genResponse() {
return CommonBeanUtils.copyProperties(this, InlongStreamResponse::new);
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
index 4fe9f923d..a972f74db 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
@@ -87,4 +87,6 @@ public class InlongStreamRequest {
@ApiModelProperty(value = "Field list")
private List<InlongStreamFieldInfo> fieldList;
+ @ApiModelProperty(value = "Inlong stream Extension properties")
+ private List<InlongStreamExtInfo> extList;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
index 9dc694fa7..fc875b98a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
@@ -115,4 +115,7 @@ public class InlongStreamResponse {
@ApiModelProperty(value = "Field list")
private List<InlongStreamFieldInfo> fieldList;
+ @ApiModelProperty(value = "Inlong stream Extension properties")
+ private List<InlongStreamExtInfo> extList;
+
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamExtEntity.java
similarity index 57%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamExtEntity.java
index b3af2b3d0..4bc648a6d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamExtEntity.java
@@ -15,40 +15,32 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.plugin.flink.dto;
+package org.apache.inlong.manager.dao.entity;
import lombok.Data;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import java.util.List;
+import java.io.Serializable;
+import java.util.Date;
/**
- * Flink infomation, including end point, job name, source type, etc.
+ * InlongStreamExtEntity, including key name, key value, etc.
*/
@Data
-public class FlinkInfo {
+public class InlongStreamExtEntity implements Serializable {
- private String endpoint;
+ private static final long serialVersionUID = 1L;
- private String jobName;
+ private Integer id;
- private List<InlongStreamInfo> inlongStreamInfoList;
+ private String inlongGroupId;
- private String localJarPath;
+ private String inlongStreamId;
- private String localConfPath;
+ private String keyName;
- private String sourceType;
+ private String keyValue;
- private String sinkType;
+ private Integer isDeleted;
- private String jobId;
-
- private String savepointPath;
-
- private boolean isException = false;
-
- private String exceptionMsg;
-
-
-}
+ private Date modifyTime;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
new file mode 100644
index 000000000..68ef19457
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface InlongStreamExtEntityMapper {
+
+ int deleteByPrimaryKey(Integer id);
+
+ int insert(InlongStreamExtEntity record);
+
+ int insertSelective(InlongStreamExtEntity record);
+
+ InlongStreamExtEntity selectByPrimaryKey(Integer id);
+
+ int updateByPrimaryKeySelective(InlongStreamExtEntity record);
+
+ int updateByPrimaryKeyWithBLOBs(InlongStreamExtEntity record);
+
+ int updateByPrimaryKey(InlongStreamExtEntity record);
+
+ List<InlongStreamExtEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
+ /**
+ * Insert data in batches
+ *
+ * @param extList need to insert data
+ */
+ int insertAll(@Param("extList") List<InlongStreamExtEntity> extList);
+
+ /**
+ * Insert data in batches, update if it exists, create new if it does not exist
+ *
+ * @param extList need to insertOnDuplicateUpdate data
+ */
+ int insertOnDuplicateKeyUpdate(@Param("extList") List<InlongStreamExtEntity> extList);
+
+ /**
+ * Physically delete all extension fields based on group id and stream id
+ *
+ * @return rows deleted
+ */
+ int deleteAllByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
+ /**
+ * Logically delete all extended fields based on group id and stream id
+ *
+ * @return rows updated
+ */
+ int logicDeleteAllByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index d875c8b7f..a1a977bb2 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -67,7 +67,7 @@
</javaClientGenerator>
<!-- Which entities to generate -->
- <table tableName="data_node" domainObjectName="DataNodeEntity"
+ <table tableName="inlong_stream_ext" domainObjectName="InlongStreamExtEntity"
enableInsert="true" enableSelectByPrimaryKey="true"
enableUpdateByPrimaryKey="true" enableDeleteByPrimaryKey="true"
enableCountByExample="false" enableDeleteByExample="false"
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
new file mode 100644
index 000000000..747701721
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
@@ -0,0 +1,203 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ <id column="id" jdbcType="INTEGER" property="id" />
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
+ <result column="key_name" jdbcType="VARCHAR" property="keyName" />
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" />
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+ </resultMap>
+ <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ <result column="key_value" jdbcType="LONGVARCHAR" property="keyValue" />
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, inlong_group_id, inlong_stream_id, key_name, is_deleted, modify_time
+ </sql>
+ <sql id="Blob_Column_List">
+ key_value
+ </sql>
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from inlong_stream_ext
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete from inlong_stream_ext
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ insert into inlong_stream_ext (id, inlong_group_id, inlong_stream_id,
+ key_name, is_deleted, modify_time,
+ key_value)
+ values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ #{keyName,jdbcType=VARCHAR}, #{isDeleted,jdbcType=INTEGER}, #{modifyTime,jdbcType=TIMESTAMP},
+ #{keyValue,jdbcType=LONGVARCHAR})
+ </insert>
+ <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ insert into inlong_stream_ext
+ <trim prefix="(" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ id,
+ </if>
+ <if test="inlongGroupId != null">
+ inlong_group_id,
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id,
+ </if>
+ <if test="keyName != null">
+ key_name,
+ </if>
+ <if test="isDeleted != null">
+ is_deleted,
+ </if>
+ <if test="modifyTime != null">
+ modify_time,
+ </if>
+ <if test="keyValue != null">
+ key_value,
+ </if>
+ </trim>
+ <trim prefix="values (" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ #{id,jdbcType=INTEGER},
+ </if>
+ <if test="inlongGroupId != null">
+ #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongStreamId != null">
+ #{inlongStreamId,jdbcType=VARCHAR},
+ </if>
+ <if test="keyName != null">
+ #{keyName,jdbcType=VARCHAR},
+ </if>
+ <if test="isDeleted != null">
+ #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="modifyTime != null">
+ #{modifyTime,jdbcType=TIMESTAMP},
+ </if>
+ <if test="keyValue != null">
+ #{keyValue,jdbcType=LONGVARCHAR},
+ </if>
+ </trim>
+ </insert>
+ <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ update inlong_stream_ext
+ <set>
+ <if test="inlongGroupId != null">
+ inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ </if>
+ <if test="keyName != null">
+ key_name = #{keyName,jdbcType=VARCHAR},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="modifyTime != null">
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+ </if>
+ <if test="keyValue != null">
+ key_value = #{keyValue,jdbcType=LONGVARCHAR},
+ </if>
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <update id="updateByPrimaryKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ update inlong_stream_ext
+ set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ key_name = #{keyName,jdbcType=VARCHAR},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+ key_value = #{keyValue,jdbcType=LONGVARCHAR}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ update inlong_stream_ext
+ set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ key_name = #{keyName,jdbcType=VARCHAR},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_stream_ext
+ where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ <if test="streamId != null and streamId != ''">
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </if>
+ and is_deleted = 0
+ </select>
+ <!-- Bulk insert-->
+ <insert id="insertAll" parameterType="java.util.List">
+ insert into inlong_stream_ext
+ (id, inlong_group_id, inlong_stream_id, key_name, key_value, is_deleted)
+ values
+ <foreach collection="extList" separator="," index="index" item="item">
+ (#{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.keyName}, #{item.keyValue}, #{item.isDeleted})
+ </foreach>
+ </insert>
+ <!-- Bulk insert,update if exists-->
+ <insert id="insertOnDuplicateKeyUpdate" parameterType="java.util.List">
+ insert into inlong_stream_ext
+ (id, inlong_group_id, inlong_stream_id, key_name, key_value, is_deleted)
+ values
+ <foreach collection="extList" separator="," index="index" item="item">
+ (#{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.keyName}, #{item.keyValue}, #{item.isDeleted})
+ </foreach>
+ ON DUPLICATE KEY UPDATE
+ inlong_group_id = VALUES(inlong_group_id),
+ inlong_stream_id = VALUES(inlong_stream_id),
+ key_name = VALUES(key_name),
+ key_value = VALUES(key_value),
+ is_deleted = VALUES(is_deleted)
+ </insert>
+ <delete id="deleteAllByRelatedId">
+ delete
+ from inlong_stream_ext
+ where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ <if test="streamId != null and streamId != ''">
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </if>
+ </delete>
+ <update id="logicDeleteAllByRelatedId">
+ update inlong_stream_ext
+ set is_deleted = 1
+ where is_deleted = 0
+ and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ <if test="streamId != null and streamId != ''">
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </if>
+ </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
index 04407ab04..45cf2b1e2 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
@@ -19,13 +19,21 @@ package org.apache.inlong.manager.plugin;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.plugin.eventselect.DeleteProcessSelector;
+import org.apache.inlong.manager.plugin.eventselect.DeleteStreamSelector;
import org.apache.inlong.manager.plugin.eventselect.RestartProcessSelector;
+import org.apache.inlong.manager.plugin.eventselect.RestartStreamSelector;
import org.apache.inlong.manager.plugin.eventselect.StartupProcessSelector;
+import org.apache.inlong.manager.plugin.eventselect.StartupStreamSelector;
import org.apache.inlong.manager.plugin.eventselect.SuspendProcessSelector;
+import org.apache.inlong.manager.plugin.eventselect.SuspendStreamSelector;
import org.apache.inlong.manager.plugin.listener.DeleteSortListener;
+import org.apache.inlong.manager.plugin.listener.DeleteStreamListener;
import org.apache.inlong.manager.plugin.listener.RestartSortListener;
+import org.apache.inlong.manager.plugin.listener.RestartStreamListener;
import org.apache.inlong.manager.plugin.listener.StartupSortListener;
+import org.apache.inlong.manager.plugin.listener.StartupStreamListener;
import org.apache.inlong.manager.plugin.listener.SuspendSortListener;
+import org.apache.inlong.manager.plugin.listener.SuspendStreamListener;
import org.apache.inlong.manager.workflow.event.EventSelector;
import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -52,6 +60,10 @@ public class FlinkSortProcessPlugin implements ProcessPlugin {
listeners.put(new RestartSortListener(), new RestartProcessSelector());
listeners.put(new SuspendSortListener(), new SuspendProcessSelector());
listeners.put(new StartupSortListener(), new StartupProcessSelector());
+ listeners.put(new DeleteStreamListener(), new DeleteStreamSelector());
+ listeners.put(new RestartStreamListener(), new RestartStreamSelector());
+ listeners.put(new SuspendStreamListener(), new SuspendStreamSelector());
+ listeners.put(new StartupStreamListener(), new StartupStreamSelector());
return listeners;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
index d7a532cd8..470b707dc 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
@@ -37,7 +37,7 @@ public class DeleteProcessSelector implements EventSelector {
ProcessForm processForm = context.getProcessForm();
String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add deleteProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+ log.info("not add deleteProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
groupId);
return false;
}
@@ -45,7 +45,7 @@ public class DeleteProcessSelector implements EventSelector {
GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
if (!flag) {
- log.info("not add deleteProcess listener as the operate was not DELETE for groupId [{}]", groupId);
+ log.info("not add deleteProcess listener, as the operate was not DELETE for groupId [{}]", groupId);
return false;
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
similarity index 62%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
index d7a532cd8..c0b738c0a 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
@@ -20,36 +20,38 @@ package org.apache.inlong.manager.plugin.eventselect;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
/**
- * Selector of delete process event.
+ * Selector of delete stream event.
*/
@Slf4j
-public class DeleteProcessSelector implements EventSelector {
+public class DeleteStreamSelector implements EventSelector {
@SneakyThrows
@Override
public boolean accept(WorkflowContext context) {
ProcessForm processForm = context.getProcessForm();
String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add deleteProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add deleteStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
groupId);
return false;
}
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
+ StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+ String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
+ boolean flag = streamResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
if (!flag) {
- log.info("not add deleteProcess listener as the operate was not DELETE for groupId [{}]", groupId);
+ log.info("not add deleteStream listener, as the operate was not DELETE for groupId [{}] and streamId [{}]",
+ groupId, streamId);
return false;
}
- log.info("add deleteProcess listener for groupId [{}]", groupId);
+ log.info("add deleteStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
return true;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
index ebc727ca0..49f697aa7 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
@@ -37,15 +37,15 @@ public class RestartProcessSelector implements EventSelector {
ProcessForm processForm = workflowContext.getProcessForm();
String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add restartProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+ log.info("not add restartProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
groupId);
return false;
}
- GroupResourceProcessForm updateProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = updateProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
+ GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+ boolean flag = groupProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
if (!flag) {
- log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
+ log.info("\"not add restartProcess listener, as the operate was not RESTART for groupId [{}]", groupId);
return false;
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
similarity index 64%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
index ebc727ca0..b9fb24aa9 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
@@ -20,36 +20,38 @@ package org.apache.inlong.manager.plugin.eventselect;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
/**
- * Selector of restart process event.
+ * Selector of restart stream event.
*/
@Slf4j
-public class RestartProcessSelector implements EventSelector {
+public class RestartStreamSelector implements EventSelector {
@SneakyThrows
@Override
public boolean accept(WorkflowContext workflowContext) {
ProcessForm processForm = workflowContext.getProcessForm();
String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add restartProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add restartStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
groupId);
return false;
}
- GroupResourceProcessForm updateProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = updateProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
+ StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+ String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+ boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
if (!flag) {
- log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
+ log.info("not add restartStream listener, as the operate was not RESTART for groupId [{}] streamId [{}]",
+ groupId, streamId);
return false;
}
- log.info("add restartProcess listener for groupId [{}]", groupId);
+ log.info("add restartStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
return true;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
index 49980eaa0..c4ea4293b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.eventselect;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -36,7 +37,14 @@ public class StartupProcessSelector implements EventSelector {
ProcessForm processForm = workflowContext.getProcessForm();
String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add startupProcess listener as the form was not GroupResource for groupId [{}]", groupId);
+ log.info("not add startupProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
+ groupId);
+ return false;
+ }
+ GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+ boolean flag = groupProcessForm.getGroupOperateType() == GroupOperateType.INIT;
+ if (!flag) {
+ log.info("not add startupProcess listener, as the operate was not INIT for groupId [{}]", groupId);
return false;
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
similarity index 63%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
index ebc727ca0..85514d209 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
@@ -20,36 +20,36 @@ package org.apache.inlong.manager.plugin.eventselect;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
/**
- * Selector of restart process event.
+ * Selector of startup stream event.
*/
@Slf4j
-public class RestartProcessSelector implements EventSelector {
+public class StartupStreamSelector implements EventSelector {
@SneakyThrows
@Override
public boolean accept(WorkflowContext workflowContext) {
ProcessForm processForm = workflowContext.getProcessForm();
String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add restartProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add startupStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
groupId);
return false;
}
-
- GroupResourceProcessForm updateProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = updateProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
+ StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+ boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.INIT;
+ String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
if (!flag) {
- log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
+ log.info("not add startupStream listener, as the operate was not INIT for groupId [{}] and streamId [{}]",
+ groupId, streamId);
return false;
}
-
- log.info("add restartProcess listener for groupId [{}]", groupId);
+ log.info("add startupStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
return true;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
index 271b7496e..ea5576c68 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
@@ -37,7 +37,7 @@ public class SuspendProcessSelector implements EventSelector {
ProcessForm processForm = workflowContext.getProcessForm();
String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add suspendProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+ log.info("not add suspendProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
groupId);
return false;
}
@@ -45,7 +45,7 @@ public class SuspendProcessSelector implements EventSelector {
GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
if (!flag) {
- log.info("not add suspendProcess listener as the operate was not SUSPEND for groupId [{}]", groupId);
+ log.info("not add suspendProcess listener, as the operate was not SUSPEND for groupId [{}]", groupId);
return false;
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
similarity index 63%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
index ebc727ca0..214e7cae6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
@@ -20,36 +20,36 @@ package org.apache.inlong.manager.plugin.eventselect;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
/**
- * Selector of restart process event.
+ * Selector of startup stream event.
*/
@Slf4j
-public class RestartProcessSelector implements EventSelector {
+public class SuspendStreamSelector implements EventSelector {
@SneakyThrows
@Override
public boolean accept(WorkflowContext workflowContext) {
ProcessForm processForm = workflowContext.getProcessForm();
String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add restartProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
- groupId);
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add suspendStream listener as StreamResourceProcessForm for groupId [{}]", groupId);
return false;
}
-
- GroupResourceProcessForm updateProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = updateProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
+ StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+ String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+ boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
if (!flag) {
- log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
+ log.info("not add suspendStream listener as the operate SUSPEND for groupId [{}] and streamId [{}]",
+ groupId, streamId);
return false;
}
- log.info("add restartProcess listener for groupId [{}]", groupId);
+ log.info("add suspendStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
return true;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index e23439afb..31e5b2a5e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -192,7 +192,7 @@ public class FlinkService {
private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
String localJarPath = flinkInfo.getLocalJarPath();
File jarFile = new File(localJarPath);
- String[] programArgs = genProgramArgs(flinkInfo, flinkConfig);
+ String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
@@ -240,7 +240,8 @@ public class FlinkService {
/**
* Build the program of the Flink job.
*/
- private String[] genProgramArgs(FlinkInfo flinkInfo,FlinkConfig flinkConfig) {
+ @Deprecated
+ private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
List<String> list = new ArrayList<>();
list.add("-cluster-id");
list.add(flinkInfo.getJobName());
@@ -262,4 +263,15 @@ public class FlinkService {
return list.toArray(new String[0]);
}
+ private String[] genProgramArgsV2(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
+ List<String> list = new ArrayList<>();
+ list.add("-cluster-id");
+ list.add(flinkInfo.getJobName());
+ list.add("-dataflow.info.file");
+ list.add(flinkInfo.getLocalConfPath());
+ list.add("-checkpoint.interval");
+ list.add("60000");
+ return list.toArray(new String[0]);
+ }
+
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index b3af2b3d0..86d6d8ca1 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -49,6 +49,4 @@ public class FlinkInfo {
private boolean isException = false;
private String exceptionMsg;
-
-
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
similarity index 59%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 485d1bca4..4b2b2ff65 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -18,19 +18,19 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -38,17 +38,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
- * Listener of restart sort.
+ * Listener of delete stream sort
*/
@Slf4j
-public class RestartSortListener implements SortOperateListener {
+public class DeleteStreamListener implements SortOperateListener {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -60,24 +58,25 @@ public class RestartSortListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
- log.error(message);
- return ListenerResult.fail(message);
- }
-
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
- log.info("inlong group ext info: {}", extList);
-
- Map<String, String> kvConf = extList.stream().collect(
+ StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+ InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+ List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
+ log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+ InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
+ List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+ log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ streamExtList.stream().forEach(extInfo -> {
+ kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+ });
String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
- groupId);
+ String message = String.format(
+ "delete sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
+ groupId, streamId);
log.error(message);
return ListenerResult.fail(message);
}
@@ -88,51 +87,27 @@ public class RestartSortListener implements SortOperateListener {
kvConf.putAll(result);
String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId [%s]", groupId);
- return ListenerResult.fail(message);
- }
- String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
- if (StringUtils.isEmpty(dataFlows)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
- log.error(message);
- return ListenerResult.fail(message);
- }
-
- // TODO Support more than one dataflow in one sort job
- Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
- OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
- });
- Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
- JsonNode dataFlow = null;
- if (dataflowOptional.isPresent()) {
- dataFlow = dataflowOptional.get();
- }
- if (Objects.isNull(dataFlow)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
- log.warn(message);
+ String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
return ListenerResult.fail(message);
}
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
- String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
- flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- flinkOperation.genPath(flinkInfo, dataFlow.toString());
- flinkOperation.restart(flinkInfo);
- log.info("job restart success for [{}]", jobId);
+ flinkOperation.delete(flinkInfo);
+ log.info("job delete success for [{}]", jobId);
return ListenerResult.success();
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
flinkOperation.pollJobStatus(flinkInfo);
- String message = String.format("restart sort failed for groupId [%s] ", groupId);
+ String message = String.format("delete sort failed for groupId [%s] streamId [%s]", groupId, streamId);
log.error(message, e);
return ListenerResult.fail(message + e.getMessage());
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 485d1bca4..507972587 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -38,8 +37,6 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -98,21 +95,6 @@ public class RestartSortListener implements SortOperateListener {
return ListenerResult.fail(message);
}
- // TODO Support more than one dataflow in one sort job
- Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
- OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
- });
- Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
- JsonNode dataFlow = null;
- if (dataflowOptional.isPresent()) {
- dataFlow = dataflowOptional.get();
- }
- if (Objects.isNull(dataFlow)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
- log.warn(message);
- return ListenerResult.fail(message);
- }
-
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
@@ -123,7 +105,7 @@ public class RestartSortListener implements SortOperateListener {
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- flinkOperation.genPath(flinkInfo, dataFlow.toString());
+ flinkOperation.genPath(flinkInfo, dataFlows);
flinkOperation.restart(flinkInfo);
log.info("job restart success for [{}]", jobId);
return ListenerResult.success();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
similarity index 68%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 485d1bca4..3974da351 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -18,14 +18,15 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
@@ -38,17 +39,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
- * Listener of restart sort.
+ * Listener of restart stream sort.
*/
@Slf4j
-public class RestartSortListener implements SortOperateListener {
+public class RestartStreamListener implements SortOperateListener {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -60,24 +59,25 @@ public class RestartSortListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
- log.error(message);
- return ListenerResult.fail(message);
- }
-
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
- log.info("inlong group ext info: {}", extList);
-
- Map<String, String> kvConf = extList.stream().collect(
+ StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+ InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+ List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
+ log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+ InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
+ List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+ log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ streamExtList.stream().forEach(extInfo -> {
+ kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+ });
String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
- groupId);
+ String message = String.format(
+ "restart sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
+ groupId, streamId);
log.error(message);
return ListenerResult.fail(message);
}
@@ -88,31 +88,16 @@ public class RestartSortListener implements SortOperateListener {
kvConf.putAll(result);
String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId [%s]", groupId);
+ String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
return ListenerResult.fail(message);
}
String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
if (StringUtils.isEmpty(dataFlows)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
+ String message = String.format("dataflow is empty for groupId [%s] streamId [%s]", groupId, streamId);
log.error(message);
return ListenerResult.fail(message);
}
- // TODO Support more than one dataflow in one sort job
- Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
- OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
- });
- Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
- JsonNode dataFlow = null;
- if (dataflowOptional.isPresent()) {
- dataFlow = dataflowOptional.get();
- }
- if (Objects.isNull(dataFlow)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
- log.warn(message);
- return ListenerResult.fail(message);
- }
-
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
@@ -123,7 +108,7 @@ public class RestartSortListener implements SortOperateListener {
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- flinkOperation.genPath(flinkInfo, dataFlow.toString());
+ flinkOperation.genPath(flinkInfo, dataFlows);
flinkOperation.restart(flinkInfo);
log.info("job restart success for [{}]", jobId);
return ListenerResult.success();
@@ -132,7 +117,7 @@ public class RestartSortListener implements SortOperateListener {
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
flinkOperation.pollJobStatus(flinkInfo);
- String message = String.format("restart sort failed for groupId [%s] ", groupId);
+ String message = String.format("restart sort failed for groupId [%s] streamId [%s] ", groupId, streamId);
log.error(message, e);
return ListenerResult.fail(message + e.getMessage());
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 150780967..3d99974ab 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -38,8 +37,6 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -90,19 +87,6 @@ public class StartupSortListener implements SortOperateListener {
log.error(message);
return ListenerResult.fail(message);
}
- Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
- OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
- });
- Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
- JsonNode dataFlow = null;
- if (dataflowOptional.isPresent()) {
- dataFlow = dataflowOptional.get();
- }
- if (Objects.isNull(dataFlow)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
- log.warn(message);
- return ListenerResult.fail(message);
- }
FlinkInfo flinkInfo = new FlinkInfo();
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
@@ -110,17 +94,15 @@ public class StartupSortListener implements SortOperateListener {
String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos());
- parseDataflow(dataFlow, flinkInfo);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- flinkOperation.genPath(flinkInfo, dataFlow.toString());
+ flinkOperation.genPath(flinkInfo, dataFlows);
flinkOperation.start(flinkInfo);
log.info("job submit success, jobId is [{}]", flinkInfo.getJobId());
} catch (Exception e) {
- // TODO why call 4 times
flinkOperation.pollJobStatus(flinkInfo);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
@@ -147,18 +129,6 @@ public class StartupSortListener implements SortOperateListener {
extInfoList.add(extInfo);
}
- /**
- * Init FlinkConf
- */
- private void parseDataflow(JsonNode dataflow, FlinkInfo flinkInfo) {
- JsonNode sourceInfo = dataflow.get(Constants.SOURCE_INFO);
- String sourceType = sourceInfo.get(Constants.TYPE).asText();
- flinkInfo.setSourceType(sourceType);
- JsonNode sinkInfo = dataflow.get(Constants.SINK_INFO);
- String sinkType = sinkInfo.get(Constants.TYPE).asText();
- flinkInfo.setSinkType(sinkType);
- }
-
@Override
public boolean async() {
return false;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
similarity index 60%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 150780967..b64633830 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -18,14 +18,15 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
@@ -38,17 +39,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
- * Listener of startup sort.
+ * Listener for startup stream sort
*/
@Slf4j
-public class StartupSortListener implements SortOperateListener {
+public class StartupStreamListener implements SortOperateListener {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -60,22 +59,21 @@ public class StartupSortListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- String message = String.format("process form was not GroupResource for groupId [%s]", groupId);
- log.error(message);
- return ListenerResult.fail(message);
- }
-
- GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
- InlongGroupInfo inlongGroupInfo = groupResourceForm.getGroupInfo();
- List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
- log.info("inlong group ext info: {}", extList);
-
- Map<String, String> kvConf = extList.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName())
- && StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
- InlongGroupExtInfo::getKeyName,
- InlongGroupExtInfo::getKeyValue));
+ StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+ InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+ List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
+ log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+ InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
+ List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+ log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+
+ Map<String, String> kvConf = groupExtList.stream().collect(
+ Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ streamExtList.stream().forEach(extInfo -> {
+ kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+ });
String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
if (StringUtils.isNotEmpty(sortExt)) {
Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
@@ -86,52 +84,36 @@ public class StartupSortListener implements SortOperateListener {
String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
if (StringUtils.isEmpty(dataFlows)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
+ String message = String.format("dataflow is empty for groupId [%s] and streamId [%s]", groupId, streamId);
log.error(message);
return ListenerResult.fail(message);
}
- Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
- OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
- });
- Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
- JsonNode dataFlow = null;
- if (dataflowOptional.isPresent()) {
- dataFlow = dataflowOptional.get();
- }
- if (Objects.isNull(dataFlow)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
- log.warn(message);
- return ListenerResult.fail(message);
- }
FlinkInfo flinkInfo = new FlinkInfo();
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
- flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos());
- parseDataflow(dataFlow, flinkInfo);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- flinkOperation.genPath(flinkInfo, dataFlow.toString());
+ flinkOperation.genPath(flinkInfo, dataFlows);
flinkOperation.start(flinkInfo);
log.info("job submit success, jobId is [{}]", flinkInfo.getJobId());
} catch (Exception e) {
- // TODO why call 4 times
flinkOperation.pollJobStatus(flinkInfo);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
flinkOperation.pollJobStatus(flinkInfo);
- String message = String.format("startup sort failed for groupId [%s] ", groupId);
+ String message = String.format("startup sort failed for groupId [%s] streamId [%s]", groupId, streamId);
log.error(message, e);
return ListenerResult.fail(message + e.getMessage());
}
- saveInfo(groupId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), extList);
+ saveInfo(groupId, streamId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), streamExtList);
flinkOperation.pollJobStatus(flinkInfo);
return ListenerResult.success();
}
@@ -139,26 +121,16 @@ public class StartupSortListener implements SortOperateListener {
/**
* Save ext info into list.
*/
- private void saveInfo(String inlongGroupId, String keyName, String keyValue, List<InlongGroupExtInfo> extInfoList) {
- InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+ private void saveInfo(String inlongGroupId, String inlongStreamId, String keyName, String keyValue,
+ List<InlongStreamExtInfo> extInfoList) {
+ InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
extInfo.setInlongGroupId(inlongGroupId);
+ extInfo.setInlongStreamId(inlongStreamId);
extInfo.setKeyName(keyName);
extInfo.setKeyValue(keyValue);
extInfoList.add(extInfo);
}
- /**
- * Init FlinkConf
- */
- private void parseDataflow(JsonNode dataflow, FlinkInfo flinkInfo) {
- JsonNode sourceInfo = dataflow.get(Constants.SOURCE_INFO);
- String sourceType = sourceInfo.get(Constants.TYPE).asText();
- flinkInfo.setSourceType(sourceType);
- JsonNode sinkInfo = dataflow.get(Constants.SINK_INFO);
- String sinkType = sinkInfo.get(Constants.TYPE).asText();
- flinkInfo.setSinkType(sinkType);
- }
-
@Override
public boolean async() {
return false;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
similarity index 59%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index 485d1bca4..e5cb9cb7b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -18,19 +18,19 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -38,17 +38,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
- * Listener of restart sort.
+ * Listener of suspend stream sort.
*/
@Slf4j
-public class RestartSortListener implements SortOperateListener {
+public class SuspendStreamListener implements SortOperateListener {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -60,24 +58,25 @@ public class RestartSortListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
- log.error(message);
- return ListenerResult.fail(message);
- }
-
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
- log.info("inlong group ext info: {}", extList);
-
- Map<String, String> kvConf = extList.stream().collect(
+ StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+ InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+ List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
+ log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+ InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
+ List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+ log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ streamExtList.stream().forEach(extInfo -> {
+ kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+ });
String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
- groupId);
+ String message = String.format(
+ "suspend sort failed for groupId [%s] streamId [%s], as the sort properties is empty",
+ groupId, streamId);
log.error(message);
return ListenerResult.fail(message);
}
@@ -88,51 +87,27 @@ public class RestartSortListener implements SortOperateListener {
kvConf.putAll(result);
String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId [%s]", groupId);
- return ListenerResult.fail(message);
- }
- String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
- if (StringUtils.isEmpty(dataFlows)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
- log.error(message);
- return ListenerResult.fail(message);
- }
-
- // TODO Support more than one dataflow in one sort job
- Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
- OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
- });
- Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
- JsonNode dataFlow = null;
- if (dataflowOptional.isPresent()) {
- dataFlow = dataflowOptional.get();
- }
- if (Objects.isNull(dataFlow)) {
- String message = String.format("dataflow is empty for groupId [%s]", groupId);
- log.warn(message);
+ String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
return ListenerResult.fail(message);
}
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
- String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
- flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- flinkOperation.genPath(flinkInfo, dataFlow.toString());
- flinkOperation.restart(flinkInfo);
- log.info("job restart success for [{}]", jobId);
+ flinkOperation.stop(flinkInfo);
+ log.info("job suspend success for [{}]", jobId);
return ListenerResult.success();
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
flinkOperation.pollJobStatus(flinkInfo);
- String message = String.format("restart sort failed for groupId [%s] ", groupId);
+ String message = String.format("suspend sort failed for groupId [%s] streamId[%s]", groupId, streamId);
log.error(message, e);
return ListenerResult.fail(message + e.getMessage());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index d1830fb04..564350292 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
@@ -46,9 +47,11 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
@@ -82,6 +85,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Autowired
private InlongStreamFieldEntityMapper streamFieldMapper;
@Autowired
+ private InlongStreamExtEntityMapper streamExtMapper;
+ @Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
private StreamSourceService sourceService;
@@ -117,7 +122,10 @@ public class InlongStreamServiceImpl implements InlongStreamService {
streamEntity.setCreateTime(new Date());
streamMapper.insertSelective(streamEntity);
- this.saveField(groupId, streamId, request.getFieldList());
+ saveField(groupId, streamId, request.getFieldList());
+ if (CollectionUtils.isNotEmpty(request.getExtList())) {
+ saveOrUpdateExt(groupId, streamId, request.getExtList());
+ }
LOGGER.info("success to save inlong stream info for groupId={}", groupId);
return streamEntity.getId();
@@ -135,12 +143,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
}
- InlongStreamInfo response = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
- List<InlongStreamFieldInfo> streamFields = this.getStreamFields(groupId, streamId);
- response.setFieldList(streamFields);
-
+ InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+ List<InlongStreamFieldInfo> streamFields = getStreamFields(groupId, streamId);
+ streamInfo.setFieldList(streamFields);
+ List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, streamId);
+ List<InlongStreamExtInfo> exts = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
+ streamInfo.setExtList(exts);
LOGGER.info("success to get inlong stream for groupId={}", groupId);
- return response;
+ return streamInfo;
}
@Override
@@ -149,15 +159,23 @@ public class InlongStreamServiceImpl implements InlongStreamService {
List<InlongStreamEntity> inlongStreamEntityList = streamMapper.selectByGroupId(groupId);
List<InlongStreamInfo> streamList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
InlongStreamInfo::new);
- List<InlongStreamFieldInfo> streamFields = this.getStreamFields(groupId, null);
+ List<InlongStreamFieldInfo> streamFields = getStreamFields(groupId, null);
Map<String, List<InlongStreamFieldInfo>> streamFieldMap = streamFields.stream().collect(
Collectors.groupingBy(InlongStreamFieldInfo::getInlongStreamId,
HashMap::new,
Collectors.toCollection(ArrayList::new)));
+ List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, null);
+ Map<String, List<InlongStreamExtInfo>> extInfoMap = extEntities.stream()
+ .map(extEntity -> CommonBeanUtils.copyProperties(extEntity, InlongStreamExtInfo::new))
+ .collect(Collectors.groupingBy(InlongStreamExtInfo::getInlongStreamId,
+ HashMap::new,
+ Collectors.toCollection(ArrayList::new)));
streamList.stream().forEach(streamInfo -> {
String streamId = streamInfo.getInlongStreamId();
List<InlongStreamFieldInfo> fieldInfos = streamFieldMap.get(streamId);
streamInfo.setFieldList(fieldInfos);
+ List<InlongStreamExtInfo> extInfos = extInfoMap.get(streamId);
+ streamInfo.setExtList(extInfos);
});
return streamList;
}
@@ -245,7 +263,10 @@ public class InlongStreamServiceImpl implements InlongStreamService {
streamMapper.updateByIdentifierSelective(streamEntity);
// Update field information
- this.updateField(groupId, streamId, request.getFieldList());
+ updateField(groupId, streamId, request.getFieldList());
+ // Update extension info
+ List<InlongStreamExtInfo> extInfos = request.getExtList();
+ saveOrUpdateExt(groupId, streamId, extInfos);
LOGGER.info("success to update inlong stream for groupId={}", groupId);
return true;
@@ -288,6 +309,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
// Logically delete the associated field table
LOGGER.debug("begin to delete inlong stream field, streamId={}", streamId);
streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
+ streamExtMapper.logicDeleteAllByRelatedId(groupId, streamId);
LOGGER.info("success to delete inlong stream, ext property and fields for groupId={}", groupId);
return true;
@@ -316,6 +338,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
String streamId = entity.getInlongStreamId();
// Logically delete the associated field, source and sink info
streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
+ streamExtMapper.logicDeleteAllByRelatedId(groupId, streamId);
sourceService.logicDeleteAll(groupId, streamId, operator);
sinkService.logicDeleteAll(groupId, streamId, operator);
}
@@ -357,17 +380,17 @@ public class InlongStreamServiceImpl implements InlongStreamService {
// Check whether it can be added: check by lower-level specific services
// this.checkBizIsTempStatus(streamInfo.getInlongGroupId());
- // 1. Save inlong stream
- this.save(streamRequest, operator);
+ // Save inlong stream
+ save(streamRequest, operator);
- // 2 Save source info
+ // Save source info
if (CollectionUtils.isNotEmpty(fullStreamRequest.getSourceInfo())) {
for (SourceRequest source : fullStreamRequest.getSourceInfo()) {
sourceService.save(source, operator);
}
}
- // 3. Save sink info
+ // Save sink info
if (CollectionUtils.isNotEmpty(fullStreamRequest.getSinkInfo())) {
for (SinkRequest sinkInfo : fullStreamRequest.getSinkInfo()) {
sinkService.save(sinkInfo, operator);
@@ -399,18 +422,16 @@ public class InlongStreamServiceImpl implements InlongStreamService {
streamMapper.deleteAllByGroupId(groupId);
for (FullStreamRequest pageInfo : fullStreamRequestList) {
- // 1.1 Delete the inlong stream extensions and fields corresponding to groupId and streamId
+ // Delete the inlong stream extensions and fields corresponding to groupId and streamId
InlongStreamRequest streamInfo = pageInfo.getStreamInfo();
String streamId = streamInfo.getInlongStreamId();
streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
-
- // 2. Delete all stream source
+ streamExtMapper.deleteAllByRelatedId(groupId, streamId);
+ // Delete all stream source
sourceService.deleteAll(groupId, streamId, operator);
-
- // 3. Delete all stream sink
+ // Delete all stream sink
sinkService.deleteAll(groupId, streamId, operator);
-
- // 4. Save the inlong stream of this batch
+ // Save the inlong stream of this batch
this.saveAll(pageInfo, operator);
}
LOGGER.info("success to batch save all stream page info");
@@ -441,23 +462,26 @@ public class InlongStreamServiceImpl implements InlongStreamService {
// Convert and encapsulate the paged results
List<FullStreamResponse> responseList = new ArrayList<>(streamInfoList.size());
for (InlongStreamInfo streamInfo : streamInfoList) {
- // 2Set the field information of the inlong stream
+ // Set the field information of the inlong stream
String streamId = streamInfo.getInlongStreamId();
List<InlongStreamFieldInfo> streamFields = getStreamFields(groupId, streamId);
streamInfo.setFieldList(streamFields);
+ List<InlongStreamExtInfo> streamExtInfos = CommonBeanUtils.copyListProperties(
+ streamExtMapper.selectByRelatedId(groupId, streamId), InlongStreamExtInfo::new);
+ streamInfo.setExtList(streamExtInfos);
FullStreamResponse pageInfo = new FullStreamResponse();
pageInfo.setStreamInfo(streamInfo);
- // 3. Query stream sources information
+ // Query stream sources information
List<SourceResponse> sourceList = sourceService.listSource(groupId, streamId);
pageInfo.setSourceInfo(sourceList);
- // 4. Query various stream sinks and its extended information, field information
+ // Query various stream sinks and its extended information, field information
List<SinkResponse> sinkList = sinkService.listSink(groupId, streamId);
pageInfo.setSinkInfo(sinkList);
- // 5. Add a single result to the paginated list
+ // Add a single result to the paginated list
responseList.add(pageInfo);
}
@@ -594,6 +618,23 @@ public class InlongStreamServiceImpl implements InlongStreamService {
streamFieldMapper.insertAll(list);
}
+ @Transactional(rollbackFor = Throwable.class)
+ void saveOrUpdateExt(String groupId, String streamId, List<InlongStreamExtInfo> exts) {
+ LOGGER.info("begin to save or update inlong stream ext info, groupId={}, streamId={}, ext={}", groupId,
+ streamId, exts);
+ if (CollectionUtils.isEmpty(exts)) {
+ return;
+ }
+
+ List<InlongStreamExtEntity> entityList = CommonBeanUtils.copyListProperties(exts, InlongStreamExtEntity::new);
+ entityList.stream().forEach(streamEntity -> {
+ streamEntity.setInlongGroupId(groupId);
+ streamEntity.setInlongStreamId(streamId);
+ });
+ streamExtMapper.insertOnDuplicateKeyUpdate(entityList);
+ LOGGER.info("success to save or update inlong stream ext for groupId={}", groupId);
+ }
+
/**
* Check whether the inlong group status is temporary
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index d560e706b..29e01a414 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -57,7 +57,6 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -366,17 +365,15 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
@Transactional(rollbackFor = Throwable.class)
- public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> infoList) {
- LOGGER.info("begin to save or update inlong group ext info, groupId={}, ext={}", groupId, infoList);
- if (CollectionUtils.isEmpty(infoList)) {
+ public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> exts) {
+ LOGGER.info("begin to save or update inlong group ext info, groupId={}, ext={}", groupId, exts);
+ if (CollectionUtils.isEmpty(exts)) {
return;
}
- List<InlongGroupExtEntity> entityList = CommonBeanUtils.copyListProperties(infoList, InlongGroupExtEntity::new);
- Date date = new Date();
+ List<InlongGroupExtEntity> entityList = CommonBeanUtils.copyListProperties(exts, InlongGroupExtEntity::new);
for (InlongGroupExtEntity entity : entityList) {
entity.setInlongGroupId(groupId);
- entity.setModifyTime(date);
}
groupExtMapper.insertOnDuplicateKeyUpdate(entityList);
LOGGER.info("success to save or update inlong group ext for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 842ea887d..b14314be8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sort;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.enums.GroupOperateType;
@@ -141,7 +142,10 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
List<SourceResponse> sourceResponses = sourceService.listSource(groupInfo.getInlongGroupId(),
streamInfo.getInlongStreamId());
for (SourceResponse sourceResponse : sourceResponses) {
- pulsarSourceResponse.setSerializationType(sourceResponse.getSerializationType());
+ if (StringUtils.isEmpty(pulsarSourceResponse.getSerializationType())
+ && StringUtils.isNotEmpty(sourceResponse.getSerializationType())) {
+ pulsarSourceResponse.setSerializationType(sourceResponse.getSerializationType());
+ }
if (SourceType.forType(sourceResponse.getSourceType()) == SourceType.KAFKA) {
pulsarSourceResponse.setPrimaryKey(((KafkaSourceResponse) sourceResponse).getPrimaryKey());
}
@@ -163,7 +167,8 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
return nodes;
}
- private List<NodeRelationShip> createNodeRelationShipsForStream(List<SourceResponse> sourceResponses,
+ private List<NodeRelationShip> createNodeRelationShipsForStream(
+ List<SourceResponse> sourceResponses,
List<SinkResponse> sinkResponses) {
NodeRelationShip relationShip = new NodeRelationShip();
List<String> inputs = sourceResponses.stream().map(sourceResponse -> sourceResponse.getSourceName())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 4cba1b7a1..4ce935da3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -20,28 +20,39 @@ package org.apache.inlong.manager.service.sort;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.service.sort.util.DataFlowUtils;
+import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
+import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
-import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -53,10 +64,12 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+ @Autowired
+ private StreamSourceService streamSourceService;
@Autowired
private StreamSinkService streamSinkService;
@Autowired
- private DataFlowUtils dataFlowUtils;
+ private CommonOperateService commonOperateService;
@Override
public TaskEvent event() {
@@ -70,6 +83,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
if (groupOperateType == GroupOperateType.SUSPEND || groupOperateType == GroupOperateType.DELETE) {
return ListenerResult.success();
}
+ InlongGroupInfo groupInfo = form.getGroupInfo();
InlongStreamInfo streamInfo = form.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
@@ -78,25 +92,23 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
log.warn("Sink not found by groupId={}", groupId);
return ListenerResult.success();
}
- //TODO must be compile with new protocal of sort
try {
- InlongGroupInfo groupInfo = form.getGroupInfo();
- Map<String, DataFlowInfo> dataFlowInfoMap = sinkResponses.stream().map(sink -> {
- DataFlowInfo flowInfo = dataFlowUtils.createDataFlow(groupInfo, sink);
- return Pair.of(sink.getInlongStreamId(), flowInfo);
- }
- ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
- InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+ List<SourceResponse> sourceResponses = createPulsarSources(groupInfo, streamInfo);
+ List<Node> nodes = createNodesForStream(sourceResponses, sinkResponses);
+ List<NodeRelationShip> nodeRelationShips = createNodeRelationShipsForStream(sourceResponses, sinkResponses);
+ StreamInfo sortStreamInfo = new StreamInfo(streamId, nodes, nodeRelationShips);
+ GroupInfo sortGroupInfo = new GroupInfo(groupId, Lists.newArrayList(sortStreamInfo));
+ String dataFlows = OBJECT_MAPPER.writeValueAsString(sortGroupInfo);
+ InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
extInfo.setInlongGroupId(groupId);
- String keyName = InlongGroupSettings.DATA_FLOW + ":" + streamId;
+ extInfo.setInlongStreamId(streamId);
+ String keyName = InlongGroupSettings.DATA_FLOW;
extInfo.setKeyName(keyName);
extInfo.setKeyValue(dataFlows);
- if (groupInfo.getExtList() == null) {
+ if (streamInfo.getExtList() == null) {
groupInfo.setExtList(Lists.newArrayList());
}
- upsertDataFlow(groupInfo, extInfo, keyName);
+ upsertDataFlow(streamInfo, extInfo, keyName);
} catch (Exception e) {
log.error("create sort config failed for sink list={} of groupId={}, streamId={}", sinkResponses, groupId,
streamId, e);
@@ -105,9 +117,61 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
return ListenerResult.success();
}
- private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo, String keyName) {
- groupInfo.getExtList().removeIf(ext -> keyName.equals(ext.getKeyName()));
- groupInfo.getExtList().add(extInfo);
+ private List<SourceResponse> createPulsarSources(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo) {
+ MQType mqType = MQType.forType(groupInfo.getMqType());
+ if (mqType != MQType.PULSAR) {
+ String errMsg = String.format("Unsupported MqType={} for Inlong", mqType);
+ log.error(errMsg);
+ throw new WorkflowListenerException(errMsg);
+ }
+ PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
+ PulsarSourceResponse pulsarSourceResponse = new PulsarSourceResponse();
+ pulsarSourceResponse.setSourceName(streamInfo.getInlongStreamId());
+ pulsarSourceResponse.setNamespace(groupInfo.getMqResource());
+ pulsarSourceResponse.setTopic(streamInfo.getMqResource());
+ pulsarSourceResponse.setAdminUrl(pulsarCluster.getAdminUrl());
+ pulsarSourceResponse.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+ List<SourceResponse> sourceResponses = streamSourceService.listSource(groupInfo.getInlongGroupId(),
+ streamInfo.getInlongStreamId());
+ for (SourceResponse sourceResponse : sourceResponses) {
+ if (StringUtils.isEmpty(pulsarSourceResponse.getSerializationType())
+ && StringUtils.isNotEmpty(sourceResponse.getSerializationType())) {
+ pulsarSourceResponse.setSerializationType(sourceResponse.getSerializationType());
+ }
+ if (SourceType.forType(sourceResponse.getSourceType()) == SourceType.KAFKA) {
+ pulsarSourceResponse.setPrimaryKey(((KafkaSourceResponse) sourceResponse).getPrimaryKey());
+ }
+ }
+ pulsarSourceResponse.setScanStartupMode("earliest");
+ pulsarSourceResponse.setFieldList(streamInfo.getFieldList());
+ return Lists.newArrayList(pulsarSourceResponse);
+ }
+
+ private List<Node> createNodesForStream(
+ List<SourceResponse> sourceResponses,
+ List<SinkResponse> sinkResponses) {
+ List<Node> nodes = Lists.newArrayList();
+ nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceResponses));
+ nodes.addAll(LoadNodeUtils.createLoadNodes(sinkResponses));
+ return nodes;
+ }
+
+ private List<NodeRelationShip> createNodeRelationShipsForStream(
+ List<SourceResponse> sourceResponses,
+ List<SinkResponse> sinkResponses) {
+ NodeRelationShip relationShip = new NodeRelationShip();
+ List<String> inputs = sourceResponses.stream().map(sourceResponse -> sourceResponse.getSourceName())
+ .collect(Collectors.toList());
+ List<String> outputs = sinkResponses.stream().map(sinkResponse -> sinkResponse.getSinkName())
+ .collect(Collectors.toList());
+ relationShip.setInputs(inputs);
+ relationShip.setOutputs(outputs);
+ return Lists.newArrayList(relationShip);
+ }
+
+ private void upsertDataFlow(InlongStreamInfo streamInfo, InlongStreamExtInfo extInfo, String keyName) {
+ streamInfo.getExtList().removeIf(ext -> keyName.equals(ext.getKeyName()));
+ streamInfo.getExtList().add(extInfo);
}
@Override
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 7552e6e2a..73e564d25 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -347,6 +347,23 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
);
+-- ----------------------------
+-- Table structure for inlong_stream_ext
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `key_name` varchar(256) NOT NULL COMMENT 'Configuration item name',
+ `key_value` text COMMENT 'The value of the configuration item',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
+ KEY `index_stream_id` (`inlong_group_id`, `inlong_stream_id`)
+);
+
-- ----------------------------
-- Table structure for inlong_stream_field
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index b56efe35d..52adb2658 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -372,6 +372,24 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream table';
+-- ----------------------------
+-- Table structure for inlong_stream_ext
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `key_name` varchar(256) NOT NULL COMMENT 'Configuration item name',
+ `key_value` text COMMENT 'The value of the configuration item',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
+ KEY `index_stream_id` (`inlong_group_id`, `inlong_stream_id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream extension table';
+
-- ----------------------------
-- Table structure for inlong_stream_field
-- ----------------------------